You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/09/20 20:59:50 UTC
svn commit: r1525101 [15/21] - in /qpid/branches/linearstore/qpid: ./ bin/
cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/
cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2...
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java Fri Sep 20 18:59:30 2013
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol;
+import java.util.ArrayList;
+import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
@@ -29,6 +31,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
@@ -42,6 +46,7 @@ public class MultiVersionProtocolEngineF
private final boolean _needClientAuth;
private final Port _port;
private final Transport _transport;
+ private final ProtocolEngineCreator[] _creators;
public MultiVersionProtocolEngineFactory(Broker broker,
SSLContext sslContext,
@@ -62,6 +67,12 @@ public class MultiVersionProtocolEngineF
_sslContext = sslContext;
_supported = supportedVersions;
_defaultSupportedReply = defaultSupportedReply;
+ List<ProtocolEngineCreator> creators = new ArrayList<ProtocolEngineCreator>();
+ for(ProtocolEngineCreator c : new QpidServiceLoader<ProtocolEngineCreator>().instancesOf(ProtocolEngineCreator.class))
+ {
+ creators.add(c);
+ }
+ _creators = creators.toArray(new ProtocolEngineCreator[creators.size()]);
_wantClientAuth = wantClientAuth;
_needClientAuth = needClientAuth;
_port = port;
@@ -72,7 +83,7 @@ public class MultiVersionProtocolEngineF
{
return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
_supported, _defaultSupportedReply, _port, _transport,
- ID_GENERATOR.getAndIncrement()
- );
+ ID_GENERATOR.getAndIncrement(),
+ _creators);
}
}
Propchange: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1501885-1525056
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Sep 20 18:59:30 2013
@@ -76,7 +76,7 @@ public interface AMQQueue extends Compar
boolean isAutoDelete();
- AMQShortString getOwner();
+ String getOwner();
AuthorizationHolder getAuthorizationHolder();
void setAuthorizationHolder(AuthorizationHolder principalHolder);
@@ -225,7 +225,8 @@ public interface AMQQueue extends Compar
void setAlternateExchange(Exchange exchange);
- Map<String, Object> getArguments();
+ Collection<String> getAvailableAttributes();
+ Object getAttribute(String attrName);
void checkCapacity(AMQSessionModel channel);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri Sep 20 18:59:30 2013
@@ -28,57 +28,46 @@ import java.util.UUID;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class AMQQueueFactory
+public class AMQQueueFactory implements QueueFactory
{
- public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
- public static final String X_QPID_CAPACITY = "x-qpid-capacity";
- public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
- public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
- public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
- public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
- public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
-
- public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
- public static final String X_QPID_DESCRIPTION = "x-qpid-description";
- public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
- public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
- public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
- public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ public static final String QPID_DEFAULT_LVQ_KEY = "qpid.LVQ_key";
+
- public static final String DLQ_ROUTING_KEY = "dlq";
- public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
- public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+ public static final String DLQ_ROUTING_KEY = "dlq";
+ private static final int MAX_LENGTH = 255;
+
+ private final VirtualHost _virtualHost;
+ private final QueueRegistry _queueRegistry;
- private AMQQueueFactory()
+ public AMQQueueFactory(VirtualHost virtualHost, QueueRegistry queueRegistry)
{
+ _virtualHost = virtualHost;
+ _queueRegistry = queueRegistry;
}
private abstract static class QueueProperty
{
- private final AMQShortString _argumentName;
+ private final String _argumentName;
public QueueProperty(String argumentName)
{
- _argumentName = new AMQShortString(argumentName);
+ _argumentName = argumentName;
}
- public AMQShortString getArgumentName()
+ public String getArgumentName()
{
return _argumentName;
}
@@ -129,56 +118,56 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ new QueueLongProperty(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumQueueDepth(value);
}
},
- new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
+ new QueueLongProperty(Queue.ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty(X_QPID_CAPACITY)
+ new QueueLongProperty(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
+ new QueueLongProperty(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setFlowResumeCapacity(value);
}
},
- new QueueIntegerProperty(X_QPID_MAXIMUM_DELIVERY_COUNT)
+ new QueueIntegerProperty(Queue.MAXIMUM_DELIVERY_ATTEMPTS)
{
public void setPropertyValue(AMQQueue queue, int value)
{
@@ -187,15 +176,45 @@ public class AMQQueueFactory
}
};
+ @Override
+ public AMQQueue restoreQueue(UUID id,
+ String queueName,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQSecurityException, AMQException
+ {
+ return createOrRestoreQueue(id, queueName, true, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, false);
+
+ }
+
/**
* @param id the id to use.
+ * @param deleteOnNoConsumer
*/
- public static AMQQueue createAMQQueueImpl(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
+ @Override
+ public AMQQueue createQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments) throws AMQSecurityException, AMQException
+ {
+ return createOrRestoreQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer, arguments, true);
+ }
+
+ private AMQQueue createOrRestoreQueue(UUID id,
+ String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ boolean exclusive,
+ boolean deleteOnNoConsumer,
+ Map<String, Object> arguments,
+ boolean createInStore) throws AMQSecurityException, AMQException
{
if (id == null)
{
@@ -206,16 +225,11 @@ public class AMQQueueFactory
throw new IllegalArgumentException("Queue name must not be null");
}
- // Access check
- if (!virtualHost.getSecurityManager().authoriseCreateQueue(autoDelete, durable, exclusive, null, null, new AMQShortString(queueName), owner))
- {
- String description = "Permission denied: queue-name '" + queueName + "'";
- throw new AMQSecurityException(description);
- }
- QueueConfiguration queueConfiguration = virtualHost.getConfiguration().getQueueConfiguration(queueName);
- boolean isDLQEnabled = isDLQEnabled(autoDelete, arguments, queueConfiguration);
- if (isDLQEnabled)
+ QueueConfiguration queueConfiguration = _virtualHost.getConfiguration().getQueueConfiguration(queueName);
+
+ boolean createDLQ = createDLQ(autoDelete, arguments, queueConfiguration);
+ if (createDLQ)
{
validateDLNames(queueName);
}
@@ -226,17 +240,17 @@ public class AMQQueueFactory
if(arguments != null)
{
- if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
+ if(arguments.containsKey(Queue.LVQ_KEY))
{
- conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY);
+ conflationKey = (String) arguments.get(Queue.LVQ_KEY);
if(conflationKey == null)
{
- conflationKey = QPID_LVQ_KEY;
+ conflationKey = QPID_DEFAULT_LVQ_KEY;
}
}
- else if(arguments.containsKey(X_QPID_PRIORITIES))
+ else if(arguments.containsKey(Queue.PRIORITIES))
{
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+ Object prioritiesObj = arguments.get(Queue.PRIORITIES);
if(prioritiesObj instanceof Number)
{
priorities = ((Number)prioritiesObj).intValue();
@@ -257,60 +271,67 @@ public class AMQQueueFactory
// TODO - should warn here of invalid format
}
}
- else if(arguments.containsKey(QPID_QUEUE_SORT_KEY))
+ else if(arguments.containsKey(Queue.SORT_KEY))
{
- sortingKey = (String)arguments.get(QPID_QUEUE_SORT_KEY);
+ sortingKey = (String)arguments.get(Queue.SORT_KEY);
}
}
AMQQueue q;
if(sortingKey != null)
{
- q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, sortingKey);
}
else if(conflationKey != null)
{
- q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
+ q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
+ q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
}
else
{
- q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
+ q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
}
+ q.setDeleteOnNoConsumers(deleteOnNoConsumer);
+
//Register the new queue
- virtualHost.getQueueRegistry().registerQueue(q);
- q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
+ _queueRegistry.registerQueue(q);
+
+ q.configure(_virtualHost.getConfiguration().getQueueConfiguration(queueName));
if(arguments != null)
{
for(QueueProperty p : DECLAREABLE_PROPERTIES)
{
- if(arguments.containsKey(p.getArgumentName().toString()))
+ if(arguments.containsKey(p.getArgumentName()))
{
- p.setPropertyValue(q, arguments.get(p.getArgumentName().toString()));
+ p.setPropertyValue(q, arguments.get(p.getArgumentName()));
}
}
+
+ if(arguments.get(Queue.NO_LOCAL) instanceof Boolean)
+ {
+ q.setNoLocal((Boolean)arguments.get(Queue.NO_LOCAL));
+ }
+
}
- if(isDLQEnabled)
+ if(createDLQ)
{
final String dlExchangeName = getDeadLetterExchangeName(queueName);
final String dlQueueName = getDeadLetterQueueName(queueName);
- final QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-
Exchange dlExchange = null;
- final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName());
+ final UUID dlExchangeId = UUIDGenerator.generateExchangeUUID(dlExchangeName, _virtualHost.getName());
try
{
- dlExchange = virtualHost.createExchange(dlExchangeId,
+ dlExchange = _virtualHost.createExchange(dlExchangeId,
dlExchangeName,
- ExchangeDefaults.FANOUT_EXCHANGE_CLASS.toString(),
+ ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
true, false, null);
}
catch(ExchangeExistsException e)
@@ -321,23 +342,19 @@ public class AMQQueueFactory
AMQQueue dlQueue = null;
- synchronized(queueRegistry)
+ synchronized(_queueRegistry)
{
- dlQueue = queueRegistry.getQueue(dlQueueName);
+ dlQueue = _queueRegistry.getQueue(dlQueueName);
if(dlQueue == null)
{
//set args to disable DLQ'ing/MDC from the DLQ itself, preventing loops etc
final Map<String, Object> args = new HashMap<String, Object>();
- args.put(X_QPID_DLQ_ENABLED, false);
- args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
-
- dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
+ args.put(Queue.CREATE_DLQ_ON_CREATION, false);
+ args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
- //enter the dlq in the persistent store
- DurableConfigurationStoreHelper.createQueue(virtualHost.getDurableConfigurationStore(),
- dlQueue,
- FieldTable.convertToFieldTable(args));
+ dlQueue = _virtualHost.createQueue(UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()), dlQueueName, true, owner, false, exclusive,
+ false, args);
}
}
@@ -350,11 +367,31 @@ public class AMQQueueFactory
}
q.setAlternateExchange(dlExchange);
}
+ else if(arguments != null && arguments.get(Queue.ALTERNATE_EXCHANGE) instanceof String)
+ {
+
+ final String altExchangeAttr = (String) arguments.get(Queue.ALTERNATE_EXCHANGE);
+ Exchange altExchange;
+ try
+ {
+ altExchange = _virtualHost.getExchange(UUID.fromString(altExchangeAttr));
+ }
+ catch(IllegalArgumentException e)
+ {
+ altExchange = _virtualHost.getExchange(altExchangeAttr);
+ }
+ q.setAlternateExchange(altExchange);
+ }
+
+ if (createInStore && q.isDurable() && !q.isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(), q);
+ }
return q;
}
- public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
+ public AMQQueue createAMQQueueImpl(QueueConfiguration config) throws AMQException
{
String queueName = config.getName();
@@ -365,9 +402,9 @@ public class AMQQueueFactory
Map<String, Object> arguments = createQueueArgumentsFromConfig(config);
// we need queues that are defined in config to have deterministic ids.
- UUID id = UUIDGenerator.generateQueueUUID(queueName, host.getName());
+ UUID id = UUIDGenerator.generateQueueUUID(queueName, _virtualHost.getName());
- AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
+ AMQQueue q = createQueue(id, queueName, durable, owner, autodelete, exclusive, false, arguments);
q.configure(config);
return q;
}
@@ -390,16 +427,16 @@ public class AMQQueueFactory
{
// check if DLQ name and DLQ exchange name do not exceed 255
String exchangeName = getDeadLetterExchangeName(name);
- if (exchangeName.length() > AMQShortString.MAX_LENGTH)
+ if (exchangeName.length() > MAX_LENGTH)
{
throw new IllegalArgumentException("DL exchange name '" + exchangeName
- + "' length exceeds limit of " + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name);
}
String queueName = getDeadLetterQueueName(name);
- if (queueName.length() > AMQShortString.MAX_LENGTH)
+ if (queueName.length() > MAX_LENGTH)
{
throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
- + AMQShortString.MAX_LENGTH + " characters for queue " + name);
+ + MAX_LENGTH + " characters for queue " + name);
}
}
@@ -414,21 +451,23 @@ public class AMQQueueFactory
* queue configuration
* @return true if DLQ enabled
*/
- protected static boolean isDLQEnabled(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
+ protected static boolean createDLQ(boolean autoDelete, Map<String, Object> arguments, QueueConfiguration qConfig)
{
//feature is not to be enabled for temporary queues or when explicitly disabled by argument
- if (!autoDelete)
+ if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
{
- boolean dlqArgumentPresent = arguments != null && arguments.containsKey(X_QPID_DLQ_ENABLED);
+ boolean dlqArgumentPresent = arguments != null
+ && arguments.containsKey(Queue.CREATE_DLQ_ON_CREATION);
if (dlqArgumentPresent || qConfig.isDeadLetterQueueEnabled())
{
boolean dlqEnabled = true;
if (dlqArgumentPresent)
{
- Object argument = arguments.get(X_QPID_DLQ_ENABLED);
- dlqEnabled = argument instanceof Boolean && ((Boolean)argument).booleanValue();
+ Object argument = arguments.get(Queue.CREATE_DLQ_ON_CREATION);
+ dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
+ || (argument instanceof String && Boolean.parseBoolean(argument.toString()));
}
- return dlqEnabled;
+ return dlqEnabled ;
}
}
return false;
@@ -464,31 +503,30 @@ public class AMQQueueFactory
if(config.getArguments() != null && !config.getArguments().isEmpty())
{
- arguments.putAll(config.getArguments());
+ arguments.putAll(QueueArgumentsConverter.convertWireArgsToModel(new HashMap<String, Object>(config.getArguments())));
}
if(config.isLVQ() || config.getLVQKey() != null)
{
- arguments.put(QPID_LAST_VALUE_QUEUE, 1);
- arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+ arguments.put(Queue.LVQ_KEY, config.getLVQKey() == null ? QPID_DEFAULT_LVQ_KEY : config.getLVQKey());
}
else if (config.getPriority() || config.getPriorities() > 0)
{
- arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ arguments.put(Queue.PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
}
else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
{
- arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+ arguments.put(Queue.SORT_KEY, config.getQueueSortKey());
}
if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
{
- arguments.put(X_QPID_DLQ_ENABLED, true);
+ arguments.put(Queue.CREATE_DLQ_ON_CREATION, true);
}
if (config.getDescription() != null && !"".equals(config.getDescription()))
{
- arguments.put(X_QPID_DESCRIPTION, config.getDescription());
+ arguments.put(Queue.DESCRIPTION, config.getDescription());
}
if (arguments.isEmpty())
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Fri Sep 20 18:59:30 2013
@@ -40,5 +40,5 @@ public interface BaseQueue extends Trans
boolean isDurable();
boolean isDeleted();
- AMQShortString getNameShortString();
+ String getName();
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri Sep 20 18:59:30 2013
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -31,7 +30,7 @@ import java.util.concurrent.ConcurrentMa
public class DefaultQueueRegistry implements QueueRegistry
{
- private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+ private ConcurrentMap<String, AMQQueue> _queueMap = new ConcurrentHashMap<String, AMQQueue>();
private final VirtualHost _virtualHost;
private final Collection<RegistryChangeListener> _listeners =
@@ -49,7 +48,7 @@ public class DefaultQueueRegistry implem
public void registerQueue(AMQQueue queue)
{
- _queueMap.put(queue.getNameShortString(), queue);
+ _queueMap.put(queue.getName(), queue);
synchronized (_listeners)
{
for(RegistryChangeListener listener : _listeners)
@@ -59,7 +58,7 @@ public class DefaultQueueRegistry implem
}
}
- public void unregisterQueue(AMQShortString name)
+ public void unregisterQueue(String name)
{
AMQQueue q = _queueMap.remove(name);
if(q != null)
@@ -74,15 +73,6 @@ public class DefaultQueueRegistry implem
}
}
- public AMQQueue getQueue(AMQShortString name)
- {
- return _queueMap.get(name);
- }
-
- public Collection<AMQShortString> getQueueNames()
- {
- return _queueMap.keySet();
- }
public Collection<AMQQueue> getQueues()
{
@@ -91,7 +81,7 @@ public class DefaultQueueRegistry implem
public AMQQueue getQueue(String queue)
{
- return getQueue(new AMQShortString(queue));
+ return queue == null ? null : _queueMap.get(queue);
}
public void addRegistryChangeListener(RegistryChangeListener listener)
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Fri Sep 20 18:59:30 2013
@@ -44,12 +44,6 @@ public class InboundMessageAdapter imple
_entry = entry;
}
-
- public AMQShortString getRoutingKeyShortString()
- {
- return AMQShortString.valueOf(_entry.getMessage().getRoutingKey());
- }
-
public String getRoutingKey()
{
return _entry.getMessage().getRoutingKey();
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Fri Sep 20 18:59:30 2013
@@ -28,7 +28,7 @@ public enum NotificationCheck
MESSAGE_COUNT_ALERT
{
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -45,7 +45,7 @@ public enum NotificationCheck
},
MESSAGE_SIZE_ALERT(true)
{
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
@@ -57,7 +57,7 @@ public enum NotificationCheck
if (messageSize >= maximumMessageSize)
{
String notificationMsg = messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]";
-
+
logNotification(this, queue, notificationMsg);
listener.notifyClients(this, queue, notificationMsg);
return true;
@@ -69,7 +69,7 @@ public enum NotificationCheck
},
QUEUE_DEPTH_ALERT
{
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -81,7 +81,7 @@ public enum NotificationCheck
if (queueDepth >= maximumQueueDepth)
{
String notificationMsg = (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.";
-
+
logNotification(this, queue, notificationMsg);
listener.notifyClients(this, queue, notificationMsg);
return true;
@@ -93,7 +93,7 @@ public enum NotificationCheck
},
MESSAGE_AGE_ALERT
{
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -107,7 +107,7 @@ public enum NotificationCheck
{
long oldestAge = currentTime - firstArrivalTime;
String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
-
+
logNotification(this, queue, notificationMsg);
listener.notifyClients(this, queue, notificationMsg);
@@ -115,7 +115,7 @@ public enum NotificationCheck
}
}
return false;
-
+
}
}
@@ -140,11 +140,11 @@ public enum NotificationCheck
return _messageSpecific;
}
- public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener);
+ public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener);
//A bit of a hack, only for use until we do the logging listener
private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
- LOGGER.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
+ LOGGER.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
}
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri Sep 20 18:59:30 2013
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -32,11 +31,7 @@ public interface QueueRegistry
void registerQueue(AMQQueue queue);
- void unregisterQueue(AMQShortString name);
-
- AMQQueue getQueue(AMQShortString name);
-
- Collection<AMQShortString> getQueueNames();
+ void unregisterQueue(String name);
Collection<AMQQueue> getQueues();
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Sep 20 18:59:30 2013
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,6 +51,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
@@ -68,21 +69,19 @@ public class SimpleAMQQueue implements A
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
- public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
- public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
- private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
private static final String QPID_NO_GROUP = "qpid.no-group";
private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP);
+
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
private final VirtualHost _virtualHost;
- private final AMQShortString _name;
+ private final String _name;
/** null means shared */
- private final AMQShortString _owner;
+ private final String _owner;
private AuthorizationHolder _authorizationHolder;
@@ -195,25 +194,16 @@ public class SimpleAMQQueue implements A
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
- protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
- {
- this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments);
- }
public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
{
this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
}
- public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
- {
- this(id, queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments);
- }
-
protected SimpleAMQQueue(UUID id,
- AMQShortString name,
+ String name,
boolean durable,
- AMQShortString owner,
+ String owner,
boolean autoDelete,
boolean exclusive,
VirtualHost virtualHost,
@@ -237,7 +227,7 @@ public class SimpleAMQQueue implements A
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
+ _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
_id = id;
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
@@ -255,19 +245,21 @@ public class SimpleAMQQueue implements A
durable, !durable,
_entries.getPriorities() > 0));
- if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+ if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
{
- if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals(SHARED_MSG_GROUP_ARG_VALUE))
+ if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
+ && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
{
- Object defaultGroup = arguments.get(QPID_DEFAULT_MESSAGE_GROUP_ARG);
+ Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+ new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS);
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
else
@@ -310,11 +302,6 @@ public class SimpleAMQQueue implements A
}
}
- public AMQShortString getNameShortString()
- {
- return _name;
- }
-
public void setNoLocal(boolean nolocal)
{
_nolocal = nolocal;
@@ -358,13 +345,17 @@ public class SimpleAMQQueue implements A
_alternateExchange = exchange;
}
- /**
- * Arguments used to create this queue. The caller is assured
- * that null will never be returned.
- */
- public Map<String, Object> getArguments()
+
+ @Override
+ public Collection<String> getAvailableAttributes()
{
- return _arguments;
+ return new ArrayList<String>(_arguments.keySet());
+ }
+
+ @Override
+ public Object getAttribute(String attrName)
+ {
+ return _arguments.get(attrName);
}
public boolean isAutoDelete()
@@ -372,7 +363,7 @@ public class SimpleAMQQueue implements A
return _autoDelete;
}
- public AMQShortString getOwner()
+ public String getOwner()
{
return _owner;
}
@@ -395,7 +386,7 @@ public class SimpleAMQQueue implements A
public String getName()
{
- return getNameShortString().toString();
+ return _name;
}
// ------ Manage Subscriptions
@@ -511,7 +502,7 @@ public class SimpleAMQQueue implements A
_logger.info("Auto-deleteing queue:" + this);
}
- delete();
+ getVirtualHost().removeQueue(this);
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
@@ -1061,7 +1052,7 @@ public class SimpleAMQQueue implements A
public int compareTo(final AMQQueue o)
{
- return _name.compareTo(o.getNameShortString());
+ return _name.compareTo(o.getName());
}
public AtomicInteger getAtomicQueueCount()
@@ -1340,7 +1331,6 @@ public class SimpleAMQQueue implements A
}
}
- _virtualHost.getQueueRegistry().unregisterQueue(_name);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -2192,7 +2182,7 @@ public class SimpleAMQQueue implements A
@Override
public String toString()
{
- return String.valueOf(getNameShortString());
+ return getName();
}
public long getUnackedMessageCountHigh()
@@ -2282,18 +2272,18 @@ public class SimpleAMQQueue implements A
{
if (description == null)
{
- _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION);
+ _arguments.remove(Queue.DESCRIPTION);
}
else
{
- _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description);
+ _arguments.put(Queue.DESCRIPTION, description);
}
}
@Override
public String getDescription()
{
- return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION);
+ return (String) _arguments.get(Queue.DESCRIPTION);
}
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri Sep 20 18:59:30 2013
@@ -33,6 +33,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.configuration.startup.DefaultRecovererProvider;
+import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
import org.apache.qpid.server.logging.LogActor;
@@ -113,7 +114,8 @@ public class ApplicationRegistry impleme
_taskExecutor = new TaskExecutor();
_taskExecutor.start();
- RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, brokerOptions);
+ StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(_store);
+ RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor, brokerOptions, storeChangeListener);
ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName());
_broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry());
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Fri Sep 20 18:59:30 2013
@@ -20,7 +20,6 @@ package org.apache.qpid.server.security;
import org.apache.log4j.Logger;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.model.AccessControlProvider;
@@ -44,6 +43,7 @@ import static org.apache.qpid.server.sec
import static org.apache.qpid.server.security.access.ObjectType.QUEUE;
import static org.apache.qpid.server.security.access.ObjectType.USER;
import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST;
+import static org.apache.qpid.server.security.access.Operation.ACCESS_LOGS;
import static org.apache.qpid.server.security.access.Operation.BIND;
import static org.apache.qpid.server.security.access.Operation.CONFIGURE;
import static org.apache.qpid.server.security.access.Operation.CONSUME;
@@ -166,12 +166,12 @@ public class SecurityManager implements
{
String pluginTypeName = getPluginTypeName(accessControl);
_hostPlugins.put(pluginTypeName, accessControl);
-
+
if(_logger.isDebugEnabled())
{
_logger.debug("Added access control to host plugins with name: " + vhostName);
}
-
+
break;
}
}
@@ -289,7 +289,7 @@ public class SecurityManager implements
return true;
}
- public boolean authoriseBind(final Exchange exch, final AMQQueue queue, final AMQShortString routingKey)
+ public boolean authoriseBind(final Exchange exch, final AMQQueue queue, final String routingKey)
{
return checkAllPlugins(new AccessCheck()
{
@@ -351,8 +351,8 @@ public class SecurityManager implements
});
}
- public boolean authoriseCreateExchange(final Boolean autoDelete, final Boolean durable, final AMQShortString exchangeName,
- final Boolean internal, final Boolean nowait, final Boolean passive, final AMQShortString exchangeType)
+ public boolean authoriseCreateExchange(final Boolean autoDelete, final Boolean durable, final String exchangeName,
+ final Boolean internal, final Boolean nowait, final Boolean passive, final String exchangeType)
{
return checkAllPlugins(new AccessCheck()
{
@@ -365,7 +365,7 @@ public class SecurityManager implements
}
public boolean authoriseCreateQueue(final Boolean autoDelete, final Boolean durable, final Boolean exclusive,
- final Boolean nowait, final Boolean passive, final AMQShortString queueName, final String owner)
+ final Boolean nowait, final Boolean passive, final String queueName, final String owner)
{
return checkAllPlugins(new AccessCheck()
{
@@ -491,7 +491,7 @@ public class SecurityManager implements
});
}
- public boolean authoriseUnbind(final Exchange exch, final AMQShortString routingKey, final AMQQueue queue)
+ public boolean authoriseUnbind(final Exchange exch, final String routingKey, final AMQQueue queue)
{
return checkAllPlugins(new AccessCheck()
{
@@ -629,4 +629,15 @@ public class SecurityManager implements
});
}
+ public boolean authoriseLogsAccess()
+ {
+ return checkAllPlugins(new AccessCheck()
+ {
+ Result allowed(AccessControl plugin)
+ {
+ return plugin.authorise(ACCESS_LOGS, BROKER, ObjectProperties.EMPTY);
+ }
+ });
+ }
+
}
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Fri Sep 20 18:59:30 2013
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.EqualsBuilder;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
@@ -136,12 +135,6 @@ public class ObjectProperties
setName(name);
}
-
- public ObjectProperties(AMQShortString name)
- {
- setName(name);
- }
-
public ObjectProperties(AMQQueue queue)
{
setName(queue.getName());
@@ -164,7 +157,7 @@ public class ObjectProperties
}
}
- public ObjectProperties(Exchange exch, AMQQueue queue, AMQShortString routingKey)
+ public ObjectProperties(Exchange exch, AMQQueue queue, String routingKey)
{
this(queue);
@@ -174,11 +167,6 @@ public class ObjectProperties
put(Property.ROUTING_KEY, routingKey);
}
- public ObjectProperties(Exchange exch, AMQShortString routingKey)
- {
- this(exch.getName(), routingKey.asString());
- }
-
public ObjectProperties(String exchangeName, String routingKey, Boolean immediate)
{
this(exchangeName, routingKey);
@@ -195,8 +183,8 @@ public class ObjectProperties
put(Property.ROUTING_KEY, routingKey);
}
- public ObjectProperties(Boolean autoDelete, Boolean durable, AMQShortString exchangeName,
- Boolean internal, Boolean nowait, Boolean passive, AMQShortString exchangeType)
+ public ObjectProperties(Boolean autoDelete, Boolean durable, String exchangeName,
+ Boolean internal, Boolean nowait, Boolean passive, String exchangeType)
{
super();
@@ -212,7 +200,7 @@ public class ObjectProperties
}
public ObjectProperties(Boolean autoDelete, Boolean durable, Boolean exclusive, Boolean nowait, Boolean passive,
- AMQShortString queueName, String owner)
+ String queueName, String owner)
{
super();
@@ -257,16 +245,6 @@ public class ObjectProperties
_properties.put(Property.NAME, name);
}
- public void setName(AMQShortString name)
- {
- put(Property.NAME, name);
- }
-
- public String put(Property key, AMQShortString value)
- {
- return put(key, value == null ? "" : value.asString());
- }
-
public String put(Property key, String value)
{
return _properties.put(key, value == null ? "" : value.trim());
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectType.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectType.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectType.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectType.java Fri Sep 20 18:59:30 2013
@@ -19,6 +19,7 @@
package org.apache.qpid.server.security.access;
import static org.apache.qpid.server.security.access.Operation.ACCESS;
+import static org.apache.qpid.server.security.access.Operation.ACCESS_LOGS;
import static org.apache.qpid.server.security.access.Operation.BIND;
import static org.apache.qpid.server.security.access.Operation.CONFIGURE;
import static org.apache.qpid.server.security.access.Operation.CONSUME;
@@ -50,7 +51,7 @@ public enum ObjectType
METHOD(Operation.ALL, ACCESS, UPDATE),
USER(Operation.ALL, CREATE, DELETE, UPDATE),
GROUP(Operation.ALL, CREATE, DELETE, UPDATE),
- BROKER(Operation.ALL, CONFIGURE);
+ BROKER(Operation.ALL, CONFIGURE, ACCESS_LOGS);
private EnumSet<Operation> _actions;
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Operation.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Operation.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Operation.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/Operation.java Fri Sep 20 18:59:30 2013
@@ -33,7 +33,8 @@ public enum Operation
DELETE,
PURGE,
UPDATE,
- CONFIGURE;
+ CONFIGURE,
+ ACCESS_LOGS;
public static Operation parse(String text)
{
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java Fri Sep 20 18:59:30 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,13 +61,13 @@ public class AmqPlainSaslServer implemen
try
{
final FieldTable ft = FieldTableFactory.newFieldTable(new DataInputStream(new ByteArrayInputStream(response)), response.length);
- String username = (String) ft.getString("LOGIN");
+ String username = ft.getString("LOGIN");
// we do not care about the prompt but it throws if null
NameCallback nameCb = new NameCallback("prompt", username);
// we do not care about the prompt but it throws if null
PasswordCallback passwordCb = new PasswordCallback("prompt", false);
// TODO: should not get pwd as a String but as a char array...
- String pwd = (String) ft.getString("PASSWORD");
+ String pwd = ft.getString("PASSWORD");
AuthorizeCallback authzCb = new AuthorizeCallback(username, username);
Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
_cbh.handle(callbacks);
Modified: qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/external/ExternalSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/external/ExternalSaslServer.java?rev=1525101&r1=1525100&r2=1525101&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/external/ExternalSaslServer.java (original)
+++ qpid/branches/linearstore/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/external/ExternalSaslServer.java Fri Sep 20 18:59:30 2013
@@ -26,6 +26,7 @@ import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class ExternalSaslServer implements SaslServer
{
@@ -88,7 +89,6 @@ public class ExternalSaslServer implemen
if (_externalPrincipal instanceof X500Principal && !_useFullDN)
{
// Construct username as <CN>@<DC1>.<DC2>.<DC3>....<DCN>
-
String username;
String dn = ((X500Principal) _externalPrincipal).getName(X500Principal.RFC2253);
@@ -97,62 +97,21 @@ public class ExternalSaslServer implemen
LOGGER.debug("Parsing username from Principal DN: " + dn);
}
- if (dn.contains("CN="))
+ username = SSLUtil.getIdFromSubjectDN(dn);
+ if (username.isEmpty())
{
- username = dn.substring(dn.indexOf("CN=") + 3, (dn.indexOf(",", dn.indexOf("CN=")) != -1) ? dn.indexOf(",", dn.indexOf("CN=")) : dn.length());
-
- if (username.isEmpty())
- {
- // CN is empty => Cannot construct username => Authentication failed => return null
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("CN value was empty in Principal name, unable to construct username");
- }
- return null;
- }
- else
- {
- if (dn.contains("DC="))
- {
- int start = 0;
- String dc = "";
-
- while (dn.indexOf("DC=", start) != -1)
- {
- int dcStart = dn.indexOf("DC=", start) + 3;
- int dcEnd = (dn.indexOf(",", dn.indexOf("DC=", start)) != -1) ? dn.indexOf(",", dn.indexOf("DC=", start)) : dn.length();
-
- if (dc.isEmpty())
- {
- dc = dn.substring(dcStart, dcEnd);
- }
- else
- {
- dc = dc.concat(".").concat(dn.substring(dcStart, dcEnd));
- }
-
- start = dn.indexOf("DC=", start) + 1;
- }
-
- username = username.concat("@").concat(dc);
- }
- }
-
+ // CN is empty => Cannot construct username => Authentication failed => return null
if(LOGGER.isDebugEnabled())
{
- LOGGER.debug("Constructing Principal with username: " + username);
+ LOGGER.debug("CN value was empty in Principal name, unable to construct username");
}
- return new UsernamePrincipal(username);
+ return null;
}
- else
+ if(LOGGER.isDebugEnabled())
{
- // No CN => Cannot construct username => Authentication failed => return null
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("No CN= present in DN, unable to construct username");
- }
- return null;
+ LOGGER.debug("Constructing Principal with username: " + username);
}
+ return new UsernamePrincipal(username);
}
else
{
@@ -160,8 +119,7 @@ public class ExternalSaslServer implemen
{
LOGGER.debug("Using external Principal: " + _externalPrincipal);
}
-
return _externalPrincipal;
}
}
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org