You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/17 21:48:06 UTC
svn commit: r1569109 [2/5] - in /qpid/trunk/qpid/java: ./
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/apache/qpid/server/store/berk...
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java Mon Feb 17 20:48:05 2014
@@ -20,23 +20,31 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
-import java.util.UUID;
public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
{
- protected PriorityQueue(UUID id,
- final String name,
- final boolean durable,
- final String owner,
- final boolean autoDelete,
- boolean exclusive,
- final VirtualHost virtualHost,
- Map<String, Object> arguments, int priorities)
+
+ public static final int DEFAULT_PRIORITY_LEVELS = 10;
+
+ protected PriorityQueue(VirtualHost virtualHost,
+ final AMQSessionModel creatingSession,
+ Map<String, Object> attributes)
+ {
+ super(virtualHost, creatingSession, attributes, entryList(attributes));
+ }
+
+ private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes)
{
- super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
+ final Integer priorities = MapValueConverter.getIntegerAttribute(Queue.PRIORITIES, attributes,
+ DEFAULT_PRIORITY_LEVELS);
+
+ return new PriorityQueueList.Factory(priorities);
}
public int getPriorities()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Mon Feb 17 20:48:05 2014
@@ -109,7 +109,7 @@ public class QueueArgumentsConverter
}
if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
{
- modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
+ modelArguments.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY);
}
if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java Mon Feb 17 20:48:05 2014
@@ -22,25 +22,15 @@ package org.apache.qpid.server.queue;
import java.util.Map;
import java.util.UUID;
+
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.QpidSecurityException;
public interface QueueFactory
{
- AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
+ AMQQueue createQueue(final AMQSessionModel creatingSession,
Map<String, Object> arguments) throws QpidSecurityException;
- AMQQueue restoreQueue(UUID id,
- String queueName,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QpidSecurityException;
+ AMQQueue restoreQueue(Map<String, Object> arguments) throws QpidSecurityException;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Feb 17 20:48:05 2014
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.server.queue;
+import java.security.Principal;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -28,11 +29,14 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
@@ -50,12 +54,16 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.Deletable;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -78,19 +86,10 @@ abstract class SimpleAMQQueue<E extends
private final String _name;
/** null means shared */
- private final String _owner;
-
- private AuthorizationHolder _authorizationHolder;
-
- private boolean _exclusive = false;
- private AMQSessionModel _exclusiveOwner;
-
+ private String _description;
private final boolean _durable;
- /** If true, this queue is deleted when the last subscriber is removed */
- private final boolean _autoDelete;
-
private Exchange _alternateExchange;
@@ -142,6 +141,10 @@ abstract class SimpleAMQQueue<E extends
private long _flowResumeCapacity;
+ private ExclusivityPolicy _exclusivityPolicy;
+ private LifetimePolicy _lifetimePolicy;
+ private Object _exclusiveOwner; // could be connection, session or Principal
+
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
@@ -157,7 +160,8 @@ abstract class SimpleAMQQueue<E extends
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
+ private final List<Action<? super Q>> _deleteTaskList =
+ new CopyOnWriteArrayList<Action<? super Q>>();
private LogSubject _logSubject;
@@ -184,16 +188,98 @@ abstract class SimpleAMQQueue<E extends
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
-
- protected SimpleAMQQueue(UUID id,
- String name,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- VirtualHost virtualHost,
- QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
+ protected SimpleAMQQueue(VirtualHost virtualHost,
+ final AMQSessionModel<?,?> creatingSession,
+ Map<String, Object> attributes,
+ QueueEntryListFactory<E, Q, L> entryListFactory)
{
+ UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
+ String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+ boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false);
+
+
+ _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,
+ Queue.EXCLUSIVE,
+ attributes,
+ ExclusivityPolicy.NONE);
+ _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+ Queue.LIFETIME_POLICY,
+ attributes,
+ LifetimePolicy.PERMANENT);
+ if(creatingSession != null)
+ {
+
+ switch(_exclusivityPolicy)
+ {
+
+ case PRINCIPAL:
+ _exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal();
+ break;
+ case CONTAINER:
+ _exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName();
+ break;
+ case CONNECTION:
+ _exclusiveOwner = creatingSession.getConnectionModel();
+ addExclusivityConstraint(creatingSession.getConnectionModel());
+ break;
+ case SESSION:
+ _exclusiveOwner = creatingSession;
+ addExclusivityConstraint(creatingSession);
+ break;
+ case NONE:
+ case LINK:
+ // nothing to do as if link no link associated until there is a consumer associated
+ break;
+ default:
+ throw new ServerScopedRuntimeException("Unknown exclusivity policy: "
+ + _exclusivityPolicy
+ + " this is a coding error inside Qpid");
+ }
+ }
+ else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL)
+ {
+ String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
+ if(owner != null)
+ {
+ _exclusiveOwner = new AuthenticatedPrincipal(owner);
+ }
+ }
+ else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER)
+ {
+ String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
+ if(owner != null)
+ {
+ _exclusiveOwner = owner;
+ }
+ }
+
+
+ if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
+ {
+ if(creatingSession != null)
+ {
+ addLifetimeConstraint(creatingSession.getConnectionModel());
+ }
+ else
+ {
+ throw new IllegalArgumentException("Queues created with a lifetime policy of "
+ + _lifetimePolicy
+ + " must be created from a connection.");
+ }
+ }
+ else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END)
+ {
+ if(creatingSession != null)
+ {
+ addLifetimeConstraint(creatingSession);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Queues created with a lifetime policy of "
+ + _lifetimePolicy
+ + " must be created from a connection.");
+ }
+ }
if (name == null)
{
@@ -207,12 +293,18 @@ abstract class SimpleAMQQueue<E extends
_name = name;
_durable = durable;
- _owner = owner;
- _autoDelete = autoDelete;
- _exclusive = exclusive;
_virtualHost = virtualHost;
- _entries = entryListFactory.createQueueEntryList((Q)this);
- _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
+ _entries = entryListFactory.createQueueEntryList((Q) this);
+ final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+
+ arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
+ arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
+
+ _arguments = Collections.synchronizedMap(arguments);
+ _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
+
+ _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
+
_id = id;
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
@@ -220,30 +312,113 @@ abstract class SimpleAMQQueue<E extends
_logSubject = new QueueLogSubject(this);
_logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
+
+ if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE))
+ {
+ setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes));
+ }
+ else
+ {
+ setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge());
+ }
+ if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE))
+ {
+ setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes));
+ }
+ else
+ {
+ setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize());
+ }
+ if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))
+ {
+ setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ attributes));
+ }
+ else
+ {
+ setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages());
+ }
+ if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES))
+ {
+ setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+ attributes));
+ }
+ else
+ {
+ setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes());
+ }
+ if (attributes.containsKey(Queue.ALERT_REPEAT_GAP))
+ {
+ setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes));
+ }
+ else
+ {
+ setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap());
+ }
+ if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES))
+ {
+ setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes));
+ }
+ else
+ {
+ setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes());
+ }
+ if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES))
+ {
+ setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes));
+ }
+ else
+ {
+ setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes());
+ }
+ if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS))
+ {
+ setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes));
+ }
+ else
+ {
+ setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts());
+ }
+
+ final String ownerString;
+ switch(_exclusivityPolicy)
+ {
+ case PRINCIPAL:
+ ownerString = ((Principal) _exclusiveOwner).getName();
+ break;
+ case CONTAINER:
+ ownerString = (String) _exclusiveOwner;
+ break;
+ default:
+ ownerString = null;
+
+ }
+
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
CurrentActor.get().message(_logSubject,
- QueueMessages.CREATED(String.valueOf(_owner),
+ QueueMessages.CREATED(ownerString,
_entries.getPriorities(),
- _owner != null,
- autoDelete,
- durable, !durable,
+ ownerString != null ,
+ _lifetimePolicy != LifetimePolicy.PERMANENT,
+ durable,
+ !durable,
_entries.getPriorities() > 0));
- if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
+ if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY))
{
- if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
- && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+ if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
+ && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
{
- Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+ Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
_messageGroupManager =
- new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+ new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
this);
}
else
{
- _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -256,6 +431,38 @@ abstract class SimpleAMQQueue<E extends
}
+ private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
+ {
+ final Action<Deletable> deleteQueueTask = new Action<Deletable>()
+ {
+ @Override
+ public void performAction(final Deletable object)
+ {
+ try
+ {
+ getVirtualHost().removeQueue(SimpleAMQQueue.this);
+ }
+ catch (QpidSecurityException e)
+ {
+ throw new ConnectionScopedRuntimeException("Unable to delete a queue even though the queue's " +
+ "lifetime was tied to an object being deleted");
+ }
+ }
+ };
+
+ lifetimeObject.addDeleteTask(deleteQueueTask);
+ addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask));
+ }
+
+ private void addExclusivityConstraint(final Deletable<? extends Deletable> lifetimeObject)
+ {
+ final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject);
+ final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction);
+ clearOwnerAction.setDeleteTask(deleteDeleteTask);
+ lifetimeObject.addDeleteTask(clearOwnerAction);
+ addDeleteTask(deleteDeleteTask);
+ }
+
public void resetNotifications()
{
// This ensure that the notification checks for the configured alerts are created.
@@ -303,12 +510,7 @@ abstract class SimpleAMQQueue<E extends
public boolean isExclusive()
{
- return _exclusive;
- }
-
- public void setExclusive(boolean exclusive)
- {
- _exclusive = exclusive;
+ return _exclusivityPolicy != ExclusivityPolicy.NONE;
}
public Exchange getAlternateExchange()
@@ -342,27 +544,27 @@ abstract class SimpleAMQQueue<E extends
return _arguments.get(attrName);
}
- public boolean isAutoDelete()
+ @Override
+ public LifetimePolicy getLifetimePolicy()
{
- return _autoDelete;
+ return _lifetimePolicy;
}
public String getOwner()
{
- return _owner;
- }
-
- public AuthorizationHolder getAuthorizationHolder()
- {
- return _authorizationHolder;
- }
-
- public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
- {
- _authorizationHolder = authorizationHolder;
+ if(_exclusiveOwner != null)
+ {
+ switch(_exclusivityPolicy)
+ {
+ case CONTAINER:
+ return (String) _exclusiveOwner;
+ case PRINCIPAL:
+ return ((Principal)_exclusiveOwner).getName();
+ }
+ }
+ return null;
}
-
public VirtualHost getVirtualHost()
{
return _virtualHost;
@@ -381,7 +583,9 @@ abstract class SimpleAMQQueue<E extends
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- EnumSet<Consumer.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException
+ EnumSet<Consumer.Option> optionSet)
+ throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException,
+ ConsumerAccessRefused
{
// Access control
@@ -396,15 +600,77 @@ abstract class SimpleAMQQueue<E extends
throw new ExistingExclusiveConsumer();
}
+ switch(_exclusivityPolicy)
+ {
+ case CONNECTION:
+ if(_exclusiveOwner == null)
+ {
+ _exclusiveOwner = target.getSessionModel().getConnectionModel();
+ addExclusivityConstraint(target.getSessionModel().getConnectionModel());
+ }
+ else
+ {
+ if(_exclusiveOwner != target.getSessionModel().getConnectionModel())
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case SESSION:
+ if(_exclusiveOwner == null)
+ {
+ _exclusiveOwner = target.getSessionModel();
+ addExclusivityConstraint(target.getSessionModel());
+ }
+ else
+ {
+ if(_exclusiveOwner != target.getSessionModel())
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case LINK:
+ if(getConsumerCount() != 0)
+ {
+ throw new ConsumerAccessRefused();
+ }
+ break;
+ case PRINCIPAL:
+ if(_exclusiveOwner == null)
+ {
+ _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ }
+ else
+ {
+ if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case CONTAINER:
+ if(_exclusiveOwner == null)
+ {
+ _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
+ }
+ else
+ {
+ if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
+ {
+ throw new ConsumerAccessRefused();
+ }
+ }
+ break;
+ case NONE:
+ break;
+ default:
+ throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+ }
boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
- if (exclusive && !isTransient && getConsumerCount() != 0)
- {
- throw new ExistingConsumerPreventsExclusive();
- }
-
QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
optionSet.contains(Consumer.Option.ACQUIRES),
optionSet.contains(Consumer.Option.SEES_REQUEUES),
@@ -473,11 +739,12 @@ abstract class SimpleAMQQueue<E extends
consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
+
consumer.setQueueContext(null);
- if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
+ if(_exclusivityPolicy == ExclusivityPolicy.LINK)
{
- setAuthorizationHolder(null);
+ _exclusiveOwner = null;
}
if(_messageGroupManager != null)
@@ -495,8 +762,12 @@ abstract class SimpleAMQQueue<E extends
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 )
+ if(!consumer.isTransient()
+ && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS )
+ && getConsumerCount() == 0)
{
+
if (_logger.isInfoEnabled())
{
_logger.info("Auto-deleting queue:" + this);
@@ -1266,12 +1537,14 @@ abstract class SimpleAMQQueue<E extends
});
}
- public void addQueueDeleteTask(final Action<AMQQueue> task)
+ @Override
+ public void addDeleteTask(final Action<? super Q> task)
{
_deleteTaskList.add(task);
}
- public void removeQueueDeleteTask(final Action<AMQQueue> task)
+ @Override
+ public void removeDeleteTask(final Action<? super Q> task)
{
_deleteTaskList.remove(task);
}
@@ -1343,9 +1616,9 @@ abstract class SimpleAMQQueue<E extends
}
- for (Action<AMQQueue> task : _deleteTaskList)
+ for (Action<? super Q> task : _deleteTaskList)
{
- task.performAction(this);
+ task.performAction((Q)this);
}
_deleteTaskList.clear();
@@ -1940,6 +2213,26 @@ abstract class SimpleAMQQueue<E extends
return _notificationChecks;
}
+ private static class DeleteDeleteTask implements Action<Deletable>
+ {
+
+ private final Deletable<? extends Deletable> _lifetimeObject;
+ private final Action<? super Deletable> _deleteQueueOwnerTask;
+
+ public DeleteDeleteTask(final Deletable<? extends Deletable> lifetimeObject,
+ final Action<? super Deletable> deleteQueueOwnerTask)
+ {
+ _lifetimeObject = lifetimeObject;
+ _deleteQueueOwnerTask = deleteQueueOwnerTask;
+ }
+
+ @Override
+ public void performAction(final Deletable object)
+ {
+ _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask);
+ }
+ }
+
private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
{
@@ -1990,38 +2283,6 @@ abstract class SimpleAMQQueue<E extends
return ids;
}
- public AMQSessionModel getExclusiveOwningSession()
- {
- return _exclusiveOwner;
- }
-
- public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
- {
- _exclusive = true;
- _exclusiveOwner = exclusiveOwner;
- }
-
-
- public void configure(QueueConfiguration config)
- {
- if (config != null)
- {
- setMaximumMessageAge(config.getMaximumMessageAge());
- setMaximumQueueDepth(config.getMaximumQueueDepth());
- setMaximumMessageSize(config.getMaximumMessageSize());
- setMaximumMessageCount(config.getMaximumMessageCount());
- setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
- setMaximumDeliveryCount(config.getMaxDeliveryCount());
- _capacity = config.getCapacity();
- _flowResumeCapacity = config.getFlowResumeCapacity();
- }
- }
-
- public long getMessageDequeueCount()
- {
- return _dequeueCount.get();
- }
-
public long getTotalEnqueueSize()
{
return _enqueueSize.get();
@@ -2130,20 +2391,13 @@ abstract class SimpleAMQQueue<E extends
@Override
public void setDescription(String description)
{
- if (description == null)
- {
- _arguments.remove(Queue.DESCRIPTION);
- }
- else
- {
- _arguments.put(Queue.DESCRIPTION, description);
- }
+ _description = description;
}
@Override
public String getDescription()
{
- return (String) _arguments.get(Queue.DESCRIPTION);
+ return _description;
}
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
@@ -2176,4 +2430,228 @@ abstract class SimpleAMQQueue<E extends
}
+ @Override
+ public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+ {
+ boolean allowed;
+ switch(_exclusivityPolicy)
+ {
+ case NONE:
+ allowed = true;
+ break;
+ case SESSION:
+ allowed = _exclusiveOwner == null || _exclusiveOwner == session;
+ break;
+ case CONNECTION:
+ allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel();
+ break;
+ case PRINCIPAL:
+ allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal());
+ break;
+ case CONTAINER:
+ allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName());
+ break;
+ case LINK:
+ allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
+ break;
+ default:
+ throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+ }
+ return allowed;
+ }
+
+ @Override
+ public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy)
+ throws ExistingConsumerPreventsExclusive
+ {
+ if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE))
+ {
+ switch(desiredPolicy)
+ {
+ case NONE:
+ _exclusiveOwner = null;
+ break;
+ case PRINCIPAL:
+ switchToPrincipalExclusivity();
+ break;
+ case CONTAINER:
+ switchToContainerExclusivity();
+ break;
+ case CONNECTION:
+ switchToConnectionExclusivity();
+ break;
+ case SESSION:
+ switchToSessionExclusivity();
+ break;
+ case LINK:
+ switchToLinkExclusivity();
+ break;
+ }
+ _exclusivityPolicy = desiredPolicy;
+ }
+ }
+
+ private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive
+ {
+ switch (getConsumerCount())
+ {
+ case 1:
+ _exclusiveSubscriber = getConsumerList().getHead().getConsumer();
+ // deliberate fall through
+ case 0:
+ _exclusiveOwner = null;
+ break;
+ default:
+ throw new ExistingConsumerPreventsExclusive();
+ }
+
+ }
+
+ private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive
+ {
+
+ switch(_exclusivityPolicy)
+ {
+ case NONE:
+ case PRINCIPAL:
+ case CONTAINER:
+ case CONNECTION:
+ AMQSessionModel session = null;
+ for(Consumer c : getConsumers())
+ {
+ if(session == null)
+ {
+ session = c.getSessionModel();
+ }
+ else if(!session.equals(c.getSessionModel()))
+ {
+ throw new ExistingConsumerPreventsExclusive();
+ }
+ }
+ _exclusiveOwner = session;
+ break;
+ case LINK:
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+ }
+ }
+
+ private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive
+ {
+ switch(_exclusivityPolicy)
+ {
+ case NONE:
+ case CONTAINER:
+ case PRINCIPAL:
+ AMQConnectionModel con = null;
+ for(Consumer c : getConsumers())
+ {
+ if(con == null)
+ {
+ con = c.getSessionModel().getConnectionModel();
+ }
+ else if(!con.equals(c.getSessionModel().getConnectionModel()))
+ {
+ throw new ExistingConsumerPreventsExclusive();
+ }
+ }
+ _exclusiveOwner = con;
+ break;
+ case SESSION:
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel();
+ break;
+ case LINK:
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+ }
+ }
+
+ private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive
+ {
+ switch(_exclusivityPolicy)
+ {
+ case NONE:
+ case PRINCIPAL:
+ String containerID = null;
+ for(Consumer c : getConsumers())
+ {
+ if(containerID == null)
+ {
+ containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName();
+ }
+ else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName()))
+ {
+ throw new ExistingConsumerPreventsExclusive();
+ }
+ }
+ _exclusiveOwner = containerID;
+ break;
+ case CONNECTION:
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName();
+ break;
+ case SESSION:
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName();
+ break;
+ case LINK:
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName();
+ }
+ }
+
+ private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive
+ {
+ switch(_exclusivityPolicy)
+ {
+ case NONE:
+ case CONTAINER:
+ Principal principal = null;
+ for(Consumer c : getConsumers())
+ {
+ if(principal == null)
+ {
+ principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ }
+ else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+ {
+ throw new ExistingConsumerPreventsExclusive();
+ }
+ }
+ _exclusiveOwner = principal;
+ break;
+ case CONNECTION:
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal();
+ break;
+ case SESSION:
+ _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal();
+ break;
+ case LINK:
+ _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+ }
+ }
+
+ private class ClearOwnerAction implements Action<Deletable>
+ {
+ private final Deletable<? extends Deletable> _lifetimeObject;
+ private DeleteDeleteTask _deleteTask;
+
+ public ClearOwnerAction(final Deletable<? extends Deletable> lifetimeObject)
+ {
+ _lifetimeObject = lifetimeObject;
+ }
+
+ @Override
+ public void performAction(final Deletable object)
+ {
+ if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject)
+ {
+ SimpleAMQQueue.this._exclusiveOwner = null;
+ }
+ if(_deleteTask != null)
+ {
+ removeDeleteTask(_deleteTask);
+ }
+ }
+
+ public void setDeleteTask(final DeleteDeleteTask deleteTask)
+ {
+ _deleteTask = deleteTask;
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Mon Feb 17 20:48:05 2014
@@ -21,11 +21,13 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
-import java.util.UUID;
public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
{
@@ -35,28 +37,26 @@ public class SortedQueue extends OutOfOr
private final Object _sortedQueueLock = new Object();
private final String _sortedPropertyName;
- protected SortedQueue(UUID id, final String name,
- final boolean durable, final String owner, final boolean autoDelete,
- final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+ protected SortedQueue(VirtualHost virtualHost,
+ final AMQSessionModel creatingSession,
+ Map<String, Object> attributes,
+ QueueEntryListFactory<SortedQueueEntry, SortedQueue, SortedQueueEntryList> factory)
{
- this(id, name, durable, owner, autoDelete, exclusive,
- virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName));
+ super(virtualHost, creatingSession, attributes, factory);
+ _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes);
}
- protected SortedQueue(UUID id, final String name,
- final boolean durable, final String owner, final boolean autoDelete,
- final boolean exclusive, final VirtualHost virtualHost,
- Map<String, Object> arguments,
- String sortedPropertyName,
- QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory)
+ protected SortedQueue(VirtualHost virtualHost,
+ final AMQSessionModel creatingSession, Map<String, Object> attributes)
{
- super(id, name, durable, owner, autoDelete, exclusive,
- virtualHost, factory, arguments);
- this._sortedPropertyName = sortedPropertyName;
+ this(virtualHost,
+ creatingSession, attributes,
+ new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes)));
}
+
public String getSortedPropertyName()
{
return _sortedPropertyName;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java Mon Feb 17 20:48:05 2014
@@ -20,22 +20,16 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
-import java.util.UUID;
public class StandardQueue extends SimpleAMQQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList>
{
- public StandardQueue(final UUID id,
- final String name,
- final boolean durable,
- final String owner,
- final boolean autoDelete,
- final boolean exclusive,
- final VirtualHost virtualHost,
- final Map<String, Object> arguments)
+ public StandardQueue(final VirtualHost virtualHost,
+ final AMQSessionModel creatingSession, final Map<String, Object> arguments)
{
- super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new StandardQueueEntryList.Factory(), arguments);
+ super(virtualHost, creatingSession, arguments, new StandardQueueEntryList.Factory());
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Mon Feb 17 20:48:05 2014
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.queue.AMQQueue;
/**
@@ -139,8 +140,8 @@ public class ObjectProperties
{
setName(queue.getName());
- put(Property.AUTO_DELETE, queue.isAutoDelete());
- put(Property.TEMPORARY, queue.isAutoDelete());
+ put(Property.AUTO_DELETE, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
+ put(Property.TEMPORARY, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
put(Property.DURABLE, queue.isDurable());
put(Property.EXCLUSIVE, queue.isExclusive());
if (queue.getAlternateExchange() != null)
@@ -151,10 +152,7 @@ public class ObjectProperties
{
put(Property.OWNER, queue.getOwner());
}
- else if (queue.getAuthorizationHolder() != null)
- {
- put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName());
- }
+
}
public ObjectProperties(Exchange exch, AMQQueue queue, String routingKey)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Mon Feb 17 20:48:05 2014
@@ -42,17 +42,11 @@ public class DurableConfigurationStoreHe
private static final String BINDING = Binding.class.getSimpleName();
private static final String EXCHANGE = Exchange.class.getSimpleName();
private static final String QUEUE = Queue.class.getSimpleName();
- private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.NAME,
- Queue.OWNER,
- Queue.EXCLUSIVE,
- Queue.ALTERNATE_EXCHANGE));
+ private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.ALTERNATE_EXCHANGE));
public static void updateQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
{
Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
- attributesMap.put(Queue.NAME, queue.getName());
- attributesMap.put(Queue.OWNER, queue.getOwner());
- attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
if (queue.getAlternateExchange() != null)
{
@@ -75,9 +69,6 @@ public class DurableConfigurationStoreHe
public static void createQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
- attributesMap.put(Queue.NAME, queue.getName());
- attributesMap.put(Queue.OWNER, queue.getOwner());
- attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
if (queue.getAlternateExchange() != null)
{
attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
@@ -103,7 +94,7 @@ public class DurableConfigurationStoreHe
Map<String, Object> attributesMap = new HashMap<String, Object>();
attributesMap.put(Exchange.NAME, exchange.getName());
attributesMap.put(Exchange.TYPE, exchange.getTypeName());
- attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
+ attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()
: LifetimePolicy.PERMANENT.name());
store.create(exchange.getId(), EXCHANGE, attributesMap);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java Mon Feb 17 20:48:05 2014
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
public class MapValueConverter
{
@@ -217,6 +218,13 @@ public class MapValueConverter
return getIntegerAttribute(name, attributes, null);
}
+ public static Long getLongAttribute(String name, Map<String,Object> attributes)
+ {
+ assertMandatoryAttribute(name, attributes);
+ Object obj = attributes.get(name);
+ return toLong(name, obj, null);
+ }
+
public static Long getLongAttribute(String name, Map<String,Object> attributes, Long defaultValue)
{
Object obj = attributes.get(name);
@@ -409,4 +417,41 @@ public class MapValueConverter
return (T) value;
}
+
+ public static UUID getUUIDAttribute(String name, Map<String, Object> attributes)
+ {
+ assertMandatoryAttribute(name, attributes);
+ return getUUIDAttribute(name, attributes, null);
+ }
+
+ public static UUID getUUIDAttribute(String name, Map<String,Object> attributes, UUID defaultVal)
+ {
+ final Object value = attributes.get(name);
+ return toUUID(value, defaultVal);
+ }
+
+ private static UUID toUUID(final Object value, final UUID defaultVal)
+ {
+ if(value == null)
+ {
+ return defaultVal;
+ }
+ else if(value instanceof UUID)
+ {
+ return (UUID)value;
+ }
+ else if(value instanceof String)
+ {
+ return UUID.fromString((String)value);
+ }
+ else if(value instanceof byte[])
+ {
+ return UUID.nameUUIDFromBytes((byte[])value);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Cannot convert " + value.getClass().getName() + " to UUID");
+ }
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Feb 17 20:48:05 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualho
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -69,7 +73,9 @@ import org.apache.qpid.server.store.Dura
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
@@ -529,7 +535,10 @@ public abstract class AbstractVirtualHos
int purged = queue.delete();
getQueueRegistry().unregisterQueue(queue.getName());
- if (queue.isDurable() && !queue.isAutoDelete())
+ if (queue.isDurable() && !(queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ || queue.getLifetimePolicy()
+ == LifetimePolicy.DELETE_ON_SESSION_END))
{
DurableConfigurationStore store = getDurableConfigurationStore();
DurableConfigurationStoreHelper.removeQueue(store, queue);
@@ -538,26 +547,24 @@ public abstract class AbstractVirtualHos
}
}
- @Override
- public AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException
+ public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException
{
+ // make a copy as we may augment (with an ID for example)
+ attributes = new LinkedHashMap<String, Object>(attributes);
- if (queueName == null)
- {
- throw new IllegalArgumentException("Queue name must not be null");
- }
+ String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+ boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+ Queue.LIFETIME_POLICY,
+ attributes,
+ LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+ boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
+ ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE);
+ String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
- // Access check
+ // Access check
if (!getSecurityManager().authoriseCreateQueue(autoDelete,
durable,
- exclusive,
+ exclusive != null && exclusive != ExclusivityPolicy.NONE,
null,
null,
queueName,
@@ -573,22 +580,27 @@ public abstract class AbstractVirtualHos
{
throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
}
- if(id == null)
+ if(!attributes.containsKey(Queue.ID))
{
- id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+ UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName());
while(_queueRegistry.getQueue(id) != null)
{
id = UUID.randomUUID();
}
+ attributes.put(Queue.ID, id);
}
- else if(_queueRegistry.getQueue(id) != null)
+ else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null)
{
- throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+ throw new QueueExistsException("Queue with id "
+ + MapValueConverter.getUUIDAttribute(Queue.ID,
+ attributes)
+ + " already exists", _queueRegistry.getQueue(queueName));
}
- return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
- arguments);
+
+
+ return _queueFactory.createQueue(creatingSession, attributes);
}
}
@@ -980,13 +992,13 @@ public abstract class AbstractVirtualHos
// house keeping task from running.
}
}
- for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+ for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections())
{
if (_logger.isDebugEnabled())
{
_logger.debug("Checking for long running open transactions on connection " + connection);
}
- for (AMQSessionModel session : connection.getSessionModels())
+ for (AMQSessionModel<?,?> session : connection.getSessionModels())
{
if (_logger.isDebugEnabled())
{
@@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHos
{
return _model;
}
+
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageAge()
+ {
+ return getConfiguration().getMaximumMessageAge();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdMessageSize()
+ {
+ return getConfiguration().getMaximumMessageSize();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthMessages()
+ {
+ return getConfiguration().getMaximumMessageCount();
+ }
+
+ @Override
+ public long getDefaultAlertThresholdQueueDepthBytes()
+ {
+ return getConfiguration().getMaximumQueueDepth();
+ }
+
+ @Override
+ public long getDefaultAlertRepeatGap()
+ {
+ return getConfiguration().getMinimumAlertRepeatGap();
+ }
+
+ @Override
+ public long getDefaultQueueFlowControlSizeBytes()
+ {
+ return getConfiguration().getCapacity();
+ }
+
+ @Override
+ public long getDefaultQueueFlowResumeSizeBytes()
+ {
+ return getConfiguration().getFlowResumeCapacity();
+ }
+
+ @Override
+ public int getDefaultMaximumDeliveryAttempts()
+ {
+ return getConfiguration().getMaxDeliveryCount();
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java Mon Feb 17 20:48:05 2014
@@ -42,6 +42,7 @@ import static org.apache.qpid.server.mod
public class DefaultUpgraderProvider implements UpgraderProvider
{
+ public static final String EXCLUSIVE = "exclusive";
private final ExchangeRegistry _exchangeRegistry;
private final VirtualHost _virtualHost;
@@ -63,7 +64,8 @@ public class DefaultUpgraderProvider imp
currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
case 2:
currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
-
+ case 3:
+ currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader());
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
@@ -263,4 +265,49 @@ public class DefaultUpgraderProvider imp
}
}
+ /*
+ * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
+ * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
+ * ensure OWNER is null unless the exclusivity policy is CONTAINER
+ */
+ private class Version3Upgrader extends NonNullUpgrader
+ {
+
+ @Override
+ public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+ {
+ if(Queue.class.getSimpleName().equals(type))
+ {
+ Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes);
+ if(attributes.get(EXCLUSIVE) instanceof Boolean)
+ {
+ boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE);
+ newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+ if(!isExclusive && attributes.containsKey("owner"))
+ {
+ newAttributes.remove("owner");
+ }
+ }
+ else
+ {
+ newAttributes.remove("owner");
+ }
+ if(!attributes.containsKey("durable"))
+ {
+ newAttributes.put("durable","true");
+ }
+ attributes = newAttributes;
+ getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+ }
+
+ getNextUpgrader().configuredObject(id,type,attributes);
+ }
+
+ @Override
+ public void complete()
+ {
+ getNextUpgrader().complete();
+ }
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java Mon Feb 17 20:48:05 2014
@@ -69,7 +69,7 @@ public class ExchangeRecoverer extends A
String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
boolean autoDelete = lifeTimePolicy == null
- || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+ || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT;
try
{
_exchange = _exchangeRegistry.getExchange(id);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java Mon Feb 17 20:48:05 2014
@@ -104,13 +104,6 @@ public class QueueRecoverer extends Abst
public AMQQueue resolve()
{
String queueName = (String) _attributes.get(Queue.NAME);
- String owner = (String) _attributes.get(Queue.OWNER);
- boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
-
- Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes);
- queueArgumentsMap.remove(Queue.NAME);
- queueArgumentsMap.remove(Queue.OWNER);
- queueArgumentsMap.remove(Queue.EXCLUSIVE);
try
{
@@ -122,8 +115,9 @@ public class QueueRecoverer extends Abst
if (_queue == null)
{
- _queue = _queueFactory.restoreQueue(_id, queueName, owner, false, exclusive,
- false, queueArgumentsMap);
+ Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes);
+ attributes.put(Queue.ID, _id);
+ _queue = _queueFactory.restoreQueue(attributes);
}
}
catch (QpidSecurityException e)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Mon Feb 17 20:48:05 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.QpidSecurityException;
@@ -59,14 +60,7 @@ public interface VirtualHost extends Dur
int removeQueue(AMQQueue queue) throws QpidSecurityException;
- AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
+ AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
Exchange createExchange(UUID id,
@@ -130,4 +124,20 @@ public interface VirtualHost extends Dur
public void block();
public void unblock();
+
+ long getDefaultAlertThresholdMessageAge();
+
+ long getDefaultAlertThresholdMessageSize();
+
+ long getDefaultAlertThresholdQueueDepthMessages();
+
+ long getDefaultAlertThresholdQueueDepthBytes();
+
+ long getDefaultAlertRepeatGap();
+
+ long getDefaultQueueFlowControlSizeBytes();
+
+ long getDefaultQueueFlowResumeSizeBytes();
+
+ int getDefaultMaximumDeliveryAttempts();
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java Mon Feb 17 20:48:05 2014
@@ -137,9 +137,6 @@ public class QueueConfigurationTest exte
qConf = new QueueConfiguration("test", vhostConfig);
assertEquals(2, qConf.getMaximumMessageAge());
- // Check inherited value
- qConf = new QueueConfiguration("test", _fullHostConf);
- assertEquals(1, qConf.getMaximumMessageAge());
}
public void testGetMaximumQueueDepth() throws ConfigurationException
@@ -153,9 +150,6 @@ public class QueueConfigurationTest exte
qConf = new QueueConfiguration("test", vhostConfig);
assertEquals(2, qConf.getMaximumQueueDepth());
- // Check inherited value
- qConf = new QueueConfiguration("test", _fullHostConf);
- assertEquals(1, qConf.getMaximumQueueDepth());
}
public void testGetMaximumMessageSize() throws ConfigurationException
@@ -169,9 +163,6 @@ public class QueueConfigurationTest exte
qConf = new QueueConfiguration("test", vhostConfig);
assertEquals(2, qConf.getMaximumMessageSize());
- // Check inherited value
- qConf = new QueueConfiguration("test", _fullHostConf);
- assertEquals(1, qConf.getMaximumMessageSize());
}
public void testGetMaximumMessageCount() throws ConfigurationException
@@ -185,22 +176,11 @@ public class QueueConfigurationTest exte
qConf = new QueueConfiguration("test", vhostConfig);
assertEquals(2, qConf.getMaximumMessageCount());
- // Check inherited value
- qConf = new QueueConfiguration("test", _fullHostConf);
- assertEquals(1, qConf.getMaximumMessageCount());
}
public void testGetMinimumAlertRepeatGap() throws Exception
{
- // set broker attribute ALERT_REPEAT_GAP to 10
- when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(10);
-
- // check that broker level setting is available on queue configuration
QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
- assertEquals(10, qConf.getMinimumAlertRepeatGap());
-
- // remove configuration for ALERT_REPEAT_GAP on broker level
- when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(null);
// Check default value
qConf = new QueueConfiguration("test", _emptyConf);
@@ -211,9 +191,6 @@ public class QueueConfigurationTest exte
qConf = new QueueConfiguration("test", vhostConfig);
assertEquals(2, qConf.getMinimumAlertRepeatGap());
- // Check inherited value
- qConf = new QueueConfiguration("test", _fullHostConf);
- assertEquals(1, qConf.getMinimumAlertRepeatGap());
}
public void testSortQueueConfiguration() throws ConfigurationException
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Mon Feb 17 20:48:05 2014
@@ -35,8 +35,10 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
+import java.security.Principal;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -333,14 +335,26 @@ public class MockConsumer implements Con
}
@Override
- public int compareTo(AMQSessionModel o)
+ public void close(AMQConstant cause, String message)
{
- return getId().compareTo(o.getId());
}
@Override
- public void close(AMQConstant cause, String message)
+ public void addDeleteTask(final Action task)
{
+
+ }
+
+ @Override
+ public void removeDeleteTask(final Action task)
+ {
+
+ }
+
+ @Override
+ public int compareTo(final Object o)
+ {
+ return 0;
}
}
@@ -431,12 +445,6 @@ public class MockConsumer implements Con
}
@Override
- public String getUserName()
- {
- return null;
- }
-
- @Override
public boolean isSessionNameUnique(byte[] name)
{
return false;
@@ -455,6 +463,12 @@ public class MockConsumer implements Con
}
@Override
+ public String getRemoteContainerName()
+ {
+ return null;
+ }
+
+ @Override
public String getClientVersion()
{
return null;
@@ -467,7 +481,7 @@ public class MockConsumer implements Con
}
@Override
- public String getPrincipalAsString()
+ public Principal getAuthorizedPrincipal()
{
return null;
}
@@ -512,5 +526,17 @@ public class MockConsumer implements Con
{
return null;
}
+
+ @Override
+ public void addDeleteTask(final Action task)
+ {
+
+ }
+
+ @Override
+ public void removeDeleteTask(final Action task)
+ {
+
+ }
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Mon Feb 17 20:48:05 2014
@@ -20,17 +20,23 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
import junit.framework.Assert;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -70,10 +76,17 @@ public class TopicExchangeTest extends Q
}
}
+ private AMQQueue<?,?,?> createQueue(String name) throws QpidSecurityException, QueueExistsException
+ {
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+ attributes.put(Queue.NAME, name);
+ return _vhost.createQueue(null, attributes);
+ }
+
public void testNoRoute() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
- false, null);
+ AMQQueue<?,?,?> queue = createQueue("a*#b");
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
@@ -84,8 +97,7 @@ public class TopicExchangeTest extends Q
public void testDirectMatch() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
- false, null);
+ AMQQueue<?,?,?> queue = createQueue("ab");
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -107,7 +119,7 @@ public class TopicExchangeTest extends Q
public void testStarMatch() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+ AMQQueue<?,?,?> queue = createQueue("a*");
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
@@ -138,7 +150,7 @@ public class TopicExchangeTest extends Q
public void testHashMatch() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+ AMQQueue<?,?,?> queue = createQueue("a#");
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
@@ -189,8 +201,7 @@ public class TopicExchangeTest extends Q
public void testMidHash() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
- false, null);
+ AMQQueue<?,?,?> queue = createQueue("a");
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
routeMessage("a.c.d.b",0l);
@@ -215,8 +226,7 @@ public class TopicExchangeTest extends Q
public void testMatchAfterHash() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
- false, null);
+ AMQQueue<?,?,?> queue = createQueue("a#");
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
@@ -254,8 +264,7 @@ public class TopicExchangeTest extends Q
public void testHashAfterHash() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
- false, null);
+ AMQQueue<?,?,?> queue = createQueue("a#");
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -276,8 +285,7 @@ public class TopicExchangeTest extends Q
public void testHashHash() throws Exception
{
- AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
- false, null);
+ AMQQueue<?,?,?> queue = createQueue("a#");
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -298,8 +306,7 @@ public class TopicExchangeTest extends Q
public void testSubMatchFails() throws Exception
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
- false, null);
+ AMQQueue queue = createQueue("a");
_exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null));
int queueCount = routeMessage("a.b.c",0l);
@@ -328,8 +335,7 @@ public class TopicExchangeTest extends Q
public void testMoreRouting() throws Exception
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
- false, null);
+ AMQQueue queue = createQueue("a");
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
@@ -342,8 +348,7 @@ public class TopicExchangeTest extends Q
public void testMoreQueue() throws Exception
{
- AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
- false, null);
+ AMQQueue queue = createQueue("a");
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java Mon Feb 17 20:48:05 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.server.logging.s
import org.apache.qpid.server.protocol.AMQConnectionModel;
+import java.security.Principal;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -43,9 +45,12 @@ public class ConnectionLogSubjectTest ex
{
super.setUp();
+ final Principal principal = mock(Principal.class);
+ when(principal.getName()).thenReturn(USER);
+
_connection = mock(AMQConnectionModel.class);
when(_connection.getConnectionId()).thenReturn(CONNECTION_ID);
- when(_connection.getPrincipalAsString()).thenReturn(USER);
+ when(_connection.getAuthorizedPrincipal()).thenReturn(principal);
when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING);
when(_connection.getVirtualHostName()).thenReturn(VHOST);
_subject = new ConnectionLogSubject(_connection);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org