You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/17 21:48:06 UTC

svn commit: r1569109 [2/5] - in /qpid/trunk/qpid/java: ./ amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/src/test/java/org/apache/qpid/server/store/berk...

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java Mon Feb 17 20:48:05 2014
@@ -20,23 +20,31 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.UUID;
 
 public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
 {
-    protected PriorityQueue(UUID id,
-                            final String name,
-                            final boolean durable,
-                            final String owner,
-                            final boolean autoDelete,
-                            boolean exclusive,
-                            final VirtualHost virtualHost,
-                            Map<String, Object> arguments, int priorities)
+
+    public static final int DEFAULT_PRIORITY_LEVELS = 10;
+
+    protected PriorityQueue(VirtualHost virtualHost,
+                            final AMQSessionModel creatingSession,
+                            Map<String, Object> attributes)
+    {
+        super(virtualHost, creatingSession, attributes, entryList(attributes));
+    }
+
+    private static PriorityQueueList.Factory entryList(final Map<String, Object> attributes)
     {
-        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
+        final Integer priorities = MapValueConverter.getIntegerAttribute(Queue.PRIORITIES, attributes,
+                                                                         DEFAULT_PRIORITY_LEVELS);
+
+        return new PriorityQueueList.Factory(priorities);
     }
 
     public int getPriorities()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Mon Feb 17 20:48:05 2014
@@ -109,7 +109,7 @@ public class QueueArgumentsConverter
             }
             if(wireArguments.containsKey(QPID_LAST_VALUE_QUEUE) && !wireArguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
             {
-                modelArguments.put(Queue.LVQ_KEY, AMQQueueFactory.QPID_DEFAULT_LVQ_KEY);
+                modelArguments.put(Queue.LVQ_KEY, ConflationQueue.DEFAULT_LVQ_KEY);
             }
             if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
             {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java Mon Feb 17 20:48:05 2014
@@ -22,25 +22,15 @@ package org.apache.qpid.server.queue;
 
 import java.util.Map;
 import java.util.UUID;
+
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.QpidSecurityException;
 
 public interface QueueFactory
 {
-    AMQQueue createQueue(UUID id,
-                         String queueName,
-                         boolean durable,
-                         String owner,
-                         boolean autoDelete,
-                         boolean exclusive,
-                         boolean deleteOnNoConsumer,
+    AMQQueue createQueue(final AMQSessionModel creatingSession,
                          Map<String, Object> arguments) throws QpidSecurityException;
 
-    AMQQueue restoreQueue(UUID id,
-                          String queueName,
-                          String owner,
-                          boolean autoDelete,
-                          boolean exclusive,
-                          boolean deleteOnNoConsumer,
-                          Map<String, Object> arguments) throws QpidSecurityException;
+    AMQQueue restoreQueue(Map<String, Object> arguments) throws QpidSecurityException;
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Feb 17 20:48:05 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.security.Principal;
 import java.util.*;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -28,11 +29,14 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogActor;
@@ -50,12 +54,16 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.Deletable;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -78,19 +86,10 @@ abstract class SimpleAMQQueue<E extends 
     private final String _name;
 
     /** null means shared */
-    private final String _owner;
-
-    private AuthorizationHolder _authorizationHolder;
-
-    private boolean _exclusive = false;
-    private AMQSessionModel _exclusiveOwner;
-
+    private String _description;
 
     private final boolean _durable;
 
-    /** If true, this queue is deleted when the last subscriber is removed */
-    private final boolean _autoDelete;
-
     private Exchange _alternateExchange;
 
 
@@ -142,6 +141,10 @@ abstract class SimpleAMQQueue<E extends 
 
     private long _flowResumeCapacity;
 
+    private ExclusivityPolicy _exclusivityPolicy;
+    private LifetimePolicy _lifetimePolicy;
+    private Object _exclusiveOwner; // could be connection, session or Principal
+
     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
 
 
@@ -157,7 +160,8 @@ abstract class SimpleAMQQueue<E extends 
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
-    private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
+    private final List<Action<? super Q>> _deleteTaskList =
+            new CopyOnWriteArrayList<Action<? super Q>>();
 
 
     private LogSubject _logSubject;
@@ -184,16 +188,98 @@ abstract class SimpleAMQQueue<E extends 
     private AMQQueue.NotificationListener _notificationListener;
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
 
-
-    protected SimpleAMQQueue(UUID id,
-                             String name,
-                             boolean durable,
-                             String owner,
-                             boolean autoDelete,
-                             boolean exclusive,
-                             VirtualHost virtualHost,
-                             QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
+    protected SimpleAMQQueue(VirtualHost virtualHost,
+                             final AMQSessionModel<?,?> creatingSession,
+                             Map<String, Object> attributes,
+                             QueueEntryListFactory<E, Q, L> entryListFactory)
     {
+        UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
+        String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+        boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false);
+
+
+        _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,
+                                                                Queue.EXCLUSIVE,
+                                                                attributes,
+                                                                ExclusivityPolicy.NONE);
+        _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+                                                             Queue.LIFETIME_POLICY,
+                                                             attributes,
+                                                             LifetimePolicy.PERMANENT);
+        if(creatingSession != null)
+        {
+
+            switch(_exclusivityPolicy)
+            {
+
+                case PRINCIPAL:
+                    _exclusiveOwner = creatingSession.getConnectionModel().getAuthorizedPrincipal();
+                    break;
+                case CONTAINER:
+                    _exclusiveOwner = creatingSession.getConnectionModel().getRemoteContainerName();
+                    break;
+                case CONNECTION:
+                    _exclusiveOwner = creatingSession.getConnectionModel();
+                    addExclusivityConstraint(creatingSession.getConnectionModel());
+                    break;
+                case SESSION:
+                    _exclusiveOwner = creatingSession;
+                    addExclusivityConstraint(creatingSession);
+                    break;
+                case NONE:
+                case LINK:
+                    // nothing to do as if link no link associated until there is a consumer associated
+                    break;
+                default:
+                    throw new ServerScopedRuntimeException("Unknown exclusivity policy: "
+                                                           + _exclusivityPolicy
+                                                           + " this is a coding error inside Qpid");
+            }
+        }
+        else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL)
+        {
+            String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
+            if(owner != null)
+            {
+                _exclusiveOwner = new AuthenticatedPrincipal(owner);
+            }
+        }
+        else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER)
+        {
+            String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
+            if(owner != null)
+            {
+                _exclusiveOwner = owner;
+            }
+        }
+
+
+        if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
+        {
+            if(creatingSession != null)
+            {
+                addLifetimeConstraint(creatingSession.getConnectionModel());
+            }
+            else
+            {
+                throw new IllegalArgumentException("Queues created with a lifetime policy of "
+                                                   + _lifetimePolicy
+                                                   + " must be created from a connection.");
+            }
+        }
+        else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END)
+        {
+            if(creatingSession != null)
+            {
+                addLifetimeConstraint(creatingSession);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Queues created with a lifetime policy of "
+                                                   + _lifetimePolicy
+                                                   + " must be created from a connection.");
+            }
+        }
 
         if (name == null)
         {
@@ -207,12 +293,18 @@ abstract class SimpleAMQQueue<E extends 
 
         _name = name;
         _durable = durable;
-        _owner = owner;
-        _autoDelete = autoDelete;
-        _exclusive = exclusive;
         _virtualHost = virtualHost;
-        _entries = entryListFactory.createQueueEntryList((Q)this);
-        _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
+        _entries = entryListFactory.createQueueEntryList((Q) this);
+        final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+
+        arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
+        arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
+
+        _arguments = Collections.synchronizedMap(arguments);
+        _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null);
+
+        _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false);
+
 
         _id = id;
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
@@ -220,30 +312,113 @@ abstract class SimpleAMQQueue<E extends 
         _logSubject = new QueueLogSubject(this);
         _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
 
+
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE))
+        {
+            setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes));
+        }
+        else
+        {
+            setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge());
+        }
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE))
+        {
+            setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes));
+        }
+        else
+        {
+            setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize());
+        }
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))
+        {
+            setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+                                                                      attributes));
+        }
+        else
+        {
+            setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages());
+        }
+        if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES))
+        {
+            setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+                                                                    attributes));
+        }
+        else
+        {
+            setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes());
+        }
+        if (attributes.containsKey(Queue.ALERT_REPEAT_GAP))
+        {
+            setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes));
+        }
+        else
+        {
+            setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap());
+        }
+        if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES))
+        {
+            setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes));
+        }
+        else
+        {
+            setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes());
+        }
+        if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES))
+        {
+            setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes));
+        }
+        else
+        {
+            setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes());
+        }
+        if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS))
+        {
+            setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes));
+        }
+        else
+        {
+            setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts());
+        }
+
+        final String ownerString;
+        switch(_exclusivityPolicy)
+        {
+            case PRINCIPAL:
+                ownerString = ((Principal) _exclusiveOwner).getName();
+                break;
+            case CONTAINER:
+                ownerString = (String) _exclusiveOwner;
+                break;
+            default:
+                ownerString = null;
+
+        }
+
         // Log the creation of this Queue.
         // The priorities display is toggled on if we set priorities > 0
         CurrentActor.get().message(_logSubject,
-                                   QueueMessages.CREATED(String.valueOf(_owner),
+                                   QueueMessages.CREATED(ownerString,
                                                          _entries.getPriorities(),
-                                                         _owner != null,
-                                                         autoDelete,
-                                                         durable, !durable,
+                                                         ownerString != null ,
+                                                         _lifetimePolicy != LifetimePolicy.PERMANENT,
+                                                         durable,
+                                                         !durable,
                                                          _entries.getPriorities() > 0));
 
-        if(arguments != null && arguments.containsKey(Queue.MESSAGE_GROUP_KEY))
+        if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY))
         {
-            if(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
-               && (Boolean)(arguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+            if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null
+               && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
             {
-                Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
+                Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
                 _messageGroupManager =
-                        new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+                        new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
                                 defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
                                 this);
             }
             else
             {
-                _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
+                _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(
                         Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
             }
         }
@@ -256,6 +431,38 @@ abstract class SimpleAMQQueue<E extends 
 
     }
 
+    private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
+    {
+        final Action<Deletable> deleteQueueTask = new Action<Deletable>()
+        {
+            @Override
+            public void performAction(final Deletable object)
+            {
+                try
+                {
+                    getVirtualHost().removeQueue(SimpleAMQQueue.this);
+                }
+                catch (QpidSecurityException e)
+                {
+                    throw new ConnectionScopedRuntimeException("Unable to delete a queue even though the queue's " +
+                                                               "lifetime was tied to an object being deleted");
+                }
+            }
+        };
+
+        lifetimeObject.addDeleteTask(deleteQueueTask);
+        addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask));
+    }
+
+    private void addExclusivityConstraint(final Deletable<? extends Deletable> lifetimeObject)
+    {
+        final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject);
+        final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction);
+        clearOwnerAction.setDeleteTask(deleteDeleteTask);
+        lifetimeObject.addDeleteTask(clearOwnerAction);
+        addDeleteTask(deleteDeleteTask);
+    }
+
     public void resetNotifications()
     {
         // This ensure that the notification checks for the configured alerts are created.
@@ -303,12 +510,7 @@ abstract class SimpleAMQQueue<E extends 
 
     public boolean isExclusive()
     {
-        return _exclusive;
-    }
-
-    public void setExclusive(boolean exclusive)
-    {
-        _exclusive = exclusive;
+        return _exclusivityPolicy != ExclusivityPolicy.NONE;
     }
 
     public Exchange getAlternateExchange()
@@ -342,27 +544,27 @@ abstract class SimpleAMQQueue<E extends 
         return _arguments.get(attrName);
     }
 
-    public boolean isAutoDelete()
+    @Override
+    public LifetimePolicy getLifetimePolicy()
     {
-        return _autoDelete;
+        return _lifetimePolicy;
     }
 
     public String getOwner()
     {
-        return _owner;
-    }
-
-    public AuthorizationHolder getAuthorizationHolder()
-    {
-        return _authorizationHolder;
-    }
-
-    public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
-    {
-        _authorizationHolder = authorizationHolder;
+        if(_exclusiveOwner != null)
+        {
+            switch(_exclusivityPolicy)
+            {
+                case CONTAINER:
+                    return (String) _exclusiveOwner;
+                case PRINCIPAL:
+                    return ((Principal)_exclusiveOwner).getName();
+            }
+        }
+        return null;
     }
 
-
     public VirtualHost getVirtualHost()
     {
         return _virtualHost;
@@ -381,7 +583,9 @@ abstract class SimpleAMQQueue<E extends 
                                      final FilterManager filters,
                                      final Class<? extends ServerMessage> messageClass,
                                      final String consumerName,
-                                     EnumSet<Consumer.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException
+                                     EnumSet<Consumer.Option> optionSet)
+            throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, QpidSecurityException,
+                   ConsumerAccessRefused
     {
 
         // Access control
@@ -396,15 +600,77 @@ abstract class SimpleAMQQueue<E extends 
             throw new ExistingExclusiveConsumer();
         }
 
+        switch(_exclusivityPolicy)
+        {
+            case CONNECTION:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel().getConnectionModel();
+                    addExclusivityConstraint(target.getSessionModel().getConnectionModel());
+                }
+                else
+                {
+                    if(_exclusiveOwner != target.getSessionModel().getConnectionModel())
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case SESSION:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel();
+                    addExclusivityConstraint(target.getSessionModel());
+                }
+                else
+                {
+                    if(_exclusiveOwner != target.getSessionModel())
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case LINK:
+                if(getConsumerCount() != 0)
+                {
+                    throw new ConsumerAccessRefused();
+                }
+                break;
+            case PRINCIPAL:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+                }
+                else
+                {
+                    if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case CONTAINER:
+                if(_exclusiveOwner == null)
+                {
+                    _exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName();
+                }
+                else
+                {
+                    if(!_exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName()))
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case NONE:
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+        }
 
         boolean exclusive =  optionSet.contains(Consumer.Option.EXCLUSIVE);
         boolean isTransient =  optionSet.contains(Consumer.Option.TRANSIENT);
 
-        if (exclusive && !isTransient && getConsumerCount() != 0)
-        {
-            throw new ExistingConsumerPreventsExclusive();
-        }
-
         QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
                                                          optionSet.contains(Consumer.Option.ACQUIRES),
                                                          optionSet.contains(Consumer.Option.SEES_REQUEUES),
@@ -473,11 +739,12 @@ abstract class SimpleAMQQueue<E extends 
             consumer.close();
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
+
             consumer.setQueueContext(null);
 
-            if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
+            if(_exclusivityPolicy == ExclusivityPolicy.LINK)
             {
-                setAuthorizationHolder(null);
+                _exclusiveOwner = null;
             }
 
             if(_messageGroupManager != null)
@@ -495,8 +762,12 @@ abstract class SimpleAMQQueue<E extends 
 
             // auto-delete queues must be deleted if there are no remaining subscribers
 
-            if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0  )
+            if(!consumer.isTransient()
+               && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+                    || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS )
+               && getConsumerCount() == 0)
             {
+
                 if (_logger.isInfoEnabled())
                 {
                     _logger.info("Auto-deleting queue:" + this);
@@ -1266,12 +1537,14 @@ abstract class SimpleAMQQueue<E extends 
                     });
     }
 
-    public void addQueueDeleteTask(final Action<AMQQueue> task)
+    @Override
+    public void addDeleteTask(final Action<? super Q> task)
     {
         _deleteTaskList.add(task);
     }
 
-    public void removeQueueDeleteTask(final Action<AMQQueue> task)
+    @Override
+    public void removeDeleteTask(final Action<? super Q> task)
     {
         _deleteTaskList.remove(task);
     }
@@ -1343,9 +1616,9 @@ abstract class SimpleAMQQueue<E extends 
             }
 
 
-            for (Action<AMQQueue> task : _deleteTaskList)
+            for (Action<? super Q> task : _deleteTaskList)
             {
-                task.performAction(this);
+                task.performAction((Q)this);
             }
 
             _deleteTaskList.clear();
@@ -1940,6 +2213,26 @@ abstract class SimpleAMQQueue<E extends 
         return _notificationChecks;
     }
 
+    private static class DeleteDeleteTask implements Action<Deletable>
+    {
+
+        private final Deletable<? extends Deletable> _lifetimeObject;
+        private final Action<? super Deletable> _deleteQueueOwnerTask;
+
+        public DeleteDeleteTask(final Deletable<? extends Deletable> lifetimeObject,
+                                final Action<? super Deletable> deleteQueueOwnerTask)
+        {
+            _lifetimeObject = lifetimeObject;
+            _deleteQueueOwnerTask = deleteQueueOwnerTask;
+        }
+
+        @Override
+        public void performAction(final Deletable object)
+        {
+            _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask);
+        }
+    }
+
     private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
     {
 
@@ -1990,38 +2283,6 @@ abstract class SimpleAMQQueue<E extends 
         return ids;
     }
 
-    public AMQSessionModel getExclusiveOwningSession()
-    {
-        return _exclusiveOwner;
-    }
-
-    public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
-    {
-        _exclusive = true;
-        _exclusiveOwner = exclusiveOwner;
-    }
-
-
-    public void configure(QueueConfiguration config)
-    {
-        if (config != null)
-        {
-            setMaximumMessageAge(config.getMaximumMessageAge());
-            setMaximumQueueDepth(config.getMaximumQueueDepth());
-            setMaximumMessageSize(config.getMaximumMessageSize());
-            setMaximumMessageCount(config.getMaximumMessageCount());
-            setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
-            setMaximumDeliveryCount(config.getMaxDeliveryCount());
-            _capacity = config.getCapacity();
-            _flowResumeCapacity = config.getFlowResumeCapacity();
-        }
-    }
-
-    public long getMessageDequeueCount()
-    {
-        return  _dequeueCount.get();
-    }
-
     public long getTotalEnqueueSize()
     {
         return _enqueueSize.get();
@@ -2130,20 +2391,13 @@ abstract class SimpleAMQQueue<E extends 
     @Override
     public void setDescription(String description)
     {
-        if (description == null)
-        {
-            _arguments.remove(Queue.DESCRIPTION);
-        }
-        else
-        {
-            _arguments.put(Queue.DESCRIPTION, description);
-        }
+        _description = description;
     }
 
     @Override
     public String getDescription()
     {
-        return (String) _arguments.get(Queue.DESCRIPTION);
+        return _description;
     }
 
     public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
@@ -2176,4 +2430,228 @@ abstract class SimpleAMQQueue<E extends 
 
     }
 
+    @Override
+    public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
+    {
+        boolean allowed;
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+                allowed = true;
+                break;
+            case SESSION:
+                allowed = _exclusiveOwner == null || _exclusiveOwner == session;
+                break;
+            case CONNECTION:
+                allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel();
+                break;
+            case PRINCIPAL:
+                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal());
+                break;
+            case CONTAINER:
+                allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName());
+                break;
+            case LINK:
+                allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+        }
+        return allowed;
+    }
+
+    @Override
+    public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy)
+            throws ExistingConsumerPreventsExclusive
+    {
+        if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE))
+        {
+            switch(desiredPolicy)
+            {
+                case NONE:
+                    _exclusiveOwner = null;
+                    break;
+                case PRINCIPAL:
+                    switchToPrincipalExclusivity();
+                    break;
+                case CONTAINER:
+                    switchToContainerExclusivity();
+                    break;
+                case CONNECTION:
+                    switchToConnectionExclusivity();
+                    break;
+                case SESSION:
+                    switchToSessionExclusivity();
+                    break;
+                case LINK:
+                    switchToLinkExclusivity();
+                    break;
+            }
+            _exclusivityPolicy = desiredPolicy;
+        }
+    }
+
+    private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch (getConsumerCount())
+        {
+            case 1:
+                _exclusiveSubscriber = getConsumerList().getHead().getConsumer();
+                // deliberate fall through
+            case 0:
+                _exclusiveOwner = null;
+                break;
+            default:
+                throw new ExistingConsumerPreventsExclusive();
+        }
+
+    }
+
+    private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case PRINCIPAL:
+            case CONTAINER:
+            case CONNECTION:
+                AMQSessionModel session = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(session == null)
+                    {
+                        session = c.getSessionModel();
+                    }
+                    else if(!session.equals(c.getSessionModel()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = session;
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+        }
+    }
+
+    private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case CONTAINER:
+            case PRINCIPAL:
+                AMQConnectionModel con = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(con == null)
+                    {
+                        con = c.getSessionModel().getConnectionModel();
+                    }
+                    else if(!con.equals(c.getSessionModel().getConnectionModel()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = con;
+                break;
+            case SESSION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel();
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel();
+        }
+    }
+
+    private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case PRINCIPAL:
+                String containerID = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(containerID == null)
+                    {
+                        containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName();
+                    }
+                    else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = containerID;
+                break;
+            case CONNECTION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName();
+                break;
+            case SESSION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName();
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName();
+        }
+    }
+
+    private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive
+    {
+        switch(_exclusivityPolicy)
+        {
+            case NONE:
+            case CONTAINER:
+                Principal principal = null;
+                for(Consumer c : getConsumers())
+                {
+                    if(principal == null)
+                    {
+                        principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+                    }
+                    else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal()))
+                    {
+                        throw new ExistingConsumerPreventsExclusive();
+                    }
+                }
+                _exclusiveOwner = principal;
+                break;
+            case CONNECTION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal();
+                break;
+            case SESSION:
+                _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal();
+                break;
+            case LINK:
+                _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal();
+        }
+    }
+
+    private class ClearOwnerAction implements Action<Deletable>
+    {
+        private final Deletable<? extends Deletable> _lifetimeObject;
+        private DeleteDeleteTask _deleteTask;
+
+        public ClearOwnerAction(final Deletable<? extends Deletable> lifetimeObject)
+        {
+            _lifetimeObject = lifetimeObject;
+        }
+
+        @Override
+        public void performAction(final Deletable object)
+        {
+            if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject)
+            {
+                SimpleAMQQueue.this._exclusiveOwner = null;
+            }
+            if(_deleteTask != null)
+            {
+                removeDeleteTask(_deleteTask);
+            }
+        }
+
+        public void setDeleteTask(final DeleteDeleteTask deleteTask)
+        {
+            _deleteTask = deleteTask;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Mon Feb 17 20:48:05 2014
@@ -21,11 +21,13 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.UUID;
 
 public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
 {
@@ -35,28 +37,26 @@ public class SortedQueue extends OutOfOr
     private final Object _sortedQueueLock = new Object();
     private final String _sortedPropertyName;
 
-    protected SortedQueue(UUID id, final String name,
-                            final boolean durable, final String owner, final boolean autoDelete,
-                            final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+    protected SortedQueue(VirtualHost virtualHost,
+                          final AMQSessionModel creatingSession,
+                          Map<String, Object> attributes,
+                          QueueEntryListFactory<SortedQueueEntry, SortedQueue, SortedQueueEntryList> factory)
     {
-        this(id, name, durable, owner, autoDelete, exclusive,
-             virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName));
+        super(virtualHost, creatingSession, attributes, factory);
+        _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes);
     }
 
 
-    protected SortedQueue(UUID id, final String name,
-                          final boolean durable, final String owner, final boolean autoDelete,
-                          final boolean exclusive, final VirtualHost virtualHost,
-                          Map<String, Object> arguments,
-                          String sortedPropertyName,
-                          QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory)
+    protected SortedQueue(VirtualHost virtualHost,
+                          final AMQSessionModel creatingSession, Map<String, Object> attributes)
     {
-        super(id, name, durable, owner, autoDelete, exclusive,
-              virtualHost, factory, arguments);
-        this._sortedPropertyName = sortedPropertyName;
+        this(virtualHost,
+             creatingSession, attributes,
+             new SortedQueueEntryListFactory(MapValueConverter.getStringAttribute(Queue.SORT_KEY, attributes)));
     }
 
 
+
     public String getSortedPropertyName()
     {
         return _sortedPropertyName;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java Mon Feb 17 20:48:05 2014
@@ -20,22 +20,16 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
-import java.util.UUID;
 
 public class StandardQueue extends SimpleAMQQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList>
 {
-    public StandardQueue(final UUID id,
-                         final String name,
-                         final boolean durable,
-                         final String owner,
-                         final boolean autoDelete,
-                         final boolean exclusive,
-                         final VirtualHost virtualHost,
-                         final Map<String, Object> arguments)
+    public StandardQueue(final VirtualHost virtualHost,
+                         final AMQSessionModel creatingSession, final Map<String, Object> arguments)
     {
-        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new StandardQueueEntryList.Factory(), arguments);
+        super(virtualHost, creatingSession, arguments, new StandardQueueEntryList.Factory());
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Mon Feb 17 20:48:05 2014
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /**
@@ -139,8 +140,8 @@ public class ObjectProperties
     {
         setName(queue.getName());
 
-        put(Property.AUTO_DELETE, queue.isAutoDelete());
-        put(Property.TEMPORARY, queue.isAutoDelete());
+        put(Property.AUTO_DELETE, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
+        put(Property.TEMPORARY, queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
         put(Property.DURABLE, queue.isDurable());
         put(Property.EXCLUSIVE, queue.isExclusive());
         if (queue.getAlternateExchange() != null)
@@ -151,10 +152,7 @@ public class ObjectProperties
         {
             put(Property.OWNER, queue.getOwner());
         }
-        else if (queue.getAuthorizationHolder() != null)
-        {
-            put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName());
-        }
+
     }
 
     public ObjectProperties(Exchange exch, AMQQueue queue, String routingKey)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Mon Feb 17 20:48:05 2014
@@ -42,17 +42,11 @@ public class DurableConfigurationStoreHe
     private static final String BINDING = Binding.class.getSimpleName();
     private static final String EXCHANGE = Exchange.class.getSimpleName();
     private static final String QUEUE = Queue.class.getSimpleName();
-    private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.NAME,
-                                                                                                  Queue.OWNER,
-                                                                                                  Queue.EXCLUSIVE,
-                                                                                                  Queue.ALTERNATE_EXCHANGE));
+    private static final Set<String> QUEUE_ARGUMENTS_EXCLUDES = new HashSet<String>(Arrays.asList(Queue.ALTERNATE_EXCHANGE));
 
     public static void updateQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
     {
         Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
-        attributesMap.put(Queue.NAME, queue.getName());
-        attributesMap.put(Queue.OWNER, queue.getOwner());
-        attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
 
         if (queue.getAlternateExchange() != null)
         {
@@ -75,9 +69,6 @@ public class DurableConfigurationStoreHe
     public static void createQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();
-        attributesMap.put(Queue.NAME, queue.getName());
-        attributesMap.put(Queue.OWNER, queue.getOwner());
-        attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
         if (queue.getAlternateExchange() != null)
         {
             attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
@@ -103,7 +94,7 @@ public class DurableConfigurationStoreHe
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Exchange.NAME, exchange.getName());
         attributesMap.put(Exchange.TYPE, exchange.getTypeName());
-        attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
+        attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name()
                 : LifetimePolicy.PERMANENT.name());
 
         store.create(exchange.getId(), EXCHANGE, attributesMap);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/MapValueConverter.java Mon Feb 17 20:48:05 2014
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 public class MapValueConverter
 {
@@ -217,6 +218,13 @@ public class MapValueConverter
         return getIntegerAttribute(name, attributes, null);
     }
 
+    public static Long getLongAttribute(String name, Map<String,Object> attributes)
+    {
+        assertMandatoryAttribute(name, attributes);
+        Object obj = attributes.get(name);
+        return toLong(name, obj, null);
+    }
+
     public static Long getLongAttribute(String name, Map<String,Object> attributes, Long defaultValue)
     {
         Object obj = attributes.get(name);
@@ -409,4 +417,41 @@ public class MapValueConverter
         return (T) value;
     }
 
+
+    public static UUID getUUIDAttribute(String name, Map<String, Object> attributes)
+    {
+        assertMandatoryAttribute(name, attributes);
+        return getUUIDAttribute(name, attributes, null);
+    }
+
+    public static UUID getUUIDAttribute(String name, Map<String,Object> attributes, UUID defaultVal)
+    {
+        final Object value = attributes.get(name);
+        return toUUID(value, defaultVal);
+    }
+
+    private static UUID toUUID(final Object value, final UUID defaultVal)
+    {
+        if(value == null)
+        {
+            return defaultVal;
+        }
+        else if(value instanceof UUID)
+        {
+            return (UUID)value;
+        }
+        else if(value instanceof String)
+        {
+            return UUID.fromString((String)value);
+        }
+        else if(value instanceof byte[])
+        {
+            return UUID.nameUUIDFromBytes((byte[])value);
+        }
+        else
+        {
+            throw new IllegalArgumentException("Cannot convert " + value.getClass().getName() + " to UUID");
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Feb 17 20:48:05 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualho
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -34,6 +35,9 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.configuration.ExchangeConfiguration;
 import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -69,7 +73,9 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
@@ -529,7 +535,10 @@ public abstract class AbstractVirtualHos
             int purged = queue.delete();
 
             getQueueRegistry().unregisterQueue(queue.getName());
-            if (queue.isDurable() && !queue.isAutoDelete())
+            if (queue.isDurable() && !(queue.getLifetimePolicy()
+                                       == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+                                       || queue.getLifetimePolicy()
+                                          == LifetimePolicy.DELETE_ON_SESSION_END))
             {
                 DurableConfigurationStore store = getDurableConfigurationStore();
                 DurableConfigurationStoreHelper.removeQueue(store, queue);
@@ -538,26 +547,24 @@ public abstract class AbstractVirtualHos
         }
     }
 
-    @Override
-    public AMQQueue createQueue(UUID id,
-                                String queueName,
-                                boolean durable,
-                                String owner,
-                                boolean autoDelete,
-                                boolean exclusive,
-                                boolean deleteOnNoConsumer,
-                                Map<String, Object> arguments) throws QpidSecurityException, QueueExistsException
+    public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> attributes) throws QpidSecurityException, QueueExistsException
     {
+        // make a copy as we may augment (with an ID for example)
+        attributes = new LinkedHashMap<String, Object>(attributes);
 
-        if (queueName == null)
-        {
-            throw new IllegalArgumentException("Queue name must not be null");
-        }
+        String queueName = MapValueConverter.getStringAttribute(Queue.NAME, attributes);
+        boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+                                                                Queue.LIFETIME_POLICY,
+                                                                attributes,
+                                                                LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+        boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
+        ExclusivityPolicy exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE);
+        String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
 
-                // Access check
+        // Access check
         if (!getSecurityManager().authoriseCreateQueue(autoDelete,
                                                        durable,
-                                                       exclusive,
+                                                       exclusive != null && exclusive != ExclusivityPolicy.NONE,
                                                        null,
                                                        null,
                                                        queueName,
@@ -573,22 +580,27 @@ public abstract class AbstractVirtualHos
             {
                 throw new QueueExistsException("Queue with name " + queueName + " already exists", _queueRegistry.getQueue(queueName));
             }
-            if(id == null)
+            if(!attributes.containsKey(Queue.ID))
             {
 
-                id = UUIDGenerator.generateExchangeUUID(queueName, getName());
+                UUID id = UUIDGenerator.generateExchangeUUID(queueName, getName());
                 while(_queueRegistry.getQueue(id) != null)
                 {
                     id = UUID.randomUUID();
                 }
+                attributes.put(Queue.ID, id);
 
             }
-            else if(_queueRegistry.getQueue(id) != null)
+            else if(_queueRegistry.getQueue(MapValueConverter.getUUIDAttribute(Queue.ID, attributes)) != null)
             {
-                throw new QueueExistsException("Queue with id " + id + " already exists", _queueRegistry.getQueue(queueName));
+                throw new QueueExistsException("Queue with id "
+                                               + MapValueConverter.getUUIDAttribute(Queue.ID,
+                                                                                    attributes)
+                                               + " already exists", _queueRegistry.getQueue(queueName));
             }
-            return _queueFactory.createQueue(id, queueName, durable, owner, autoDelete, exclusive, deleteOnNoConsumer,
-                    arguments);
+
+
+            return _queueFactory.createQueue(creatingSession, attributes);
         }
 
     }
@@ -980,13 +992,13 @@ public abstract class AbstractVirtualHos
                     // house keeping task from running.
                 }
             }
-            for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+            for (AMQConnectionModel<?,?> connection : getConnectionRegistry().getConnections())
             {
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Checking for long running open transactions on connection " + connection);
                 }
-                for (AMQSessionModel session : connection.getSessionModels())
+                for (AMQSessionModel<?,?> session : connection.getSessionModels())
                 {
                     if (_logger.isDebugEnabled())
                     {
@@ -1046,5 +1058,54 @@ public abstract class AbstractVirtualHos
         {
             return _model;
         }
+
+    }
+
+    @Override
+    public long getDefaultAlertThresholdMessageAge()
+    {
+        return getConfiguration().getMaximumMessageAge();
+    }
+
+    @Override
+    public long getDefaultAlertThresholdMessageSize()
+    {
+        return getConfiguration().getMaximumMessageSize();
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthMessages()
+    {
+        return getConfiguration().getMaximumMessageCount();
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthBytes()
+    {
+        return getConfiguration().getMaximumQueueDepth();
+    }
+
+    @Override
+    public long getDefaultAlertRepeatGap()
+    {
+        return getConfiguration().getMinimumAlertRepeatGap();
+    }
+
+    @Override
+    public long getDefaultQueueFlowControlSizeBytes()
+    {
+        return getConfiguration().getCapacity();
+    }
+
+    @Override
+    public long getDefaultQueueFlowResumeSizeBytes()
+    {
+        return getConfiguration().getFlowResumeCapacity();
+    }
+
+    @Override
+    public int getDefaultMaximumDeliveryAttempts()
+    {
+        return getConfiguration().getMaxDeliveryCount();
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java Mon Feb 17 20:48:05 2014
@@ -42,6 +42,7 @@ import static org.apache.qpid.server.mod
 
 public class DefaultUpgraderProvider implements UpgraderProvider
 {
+    public static final String EXCLUSIVE = "exclusive";
     private final ExchangeRegistry _exchangeRegistry;
     private final VirtualHost _virtualHost;
 
@@ -63,7 +64,8 @@ public class DefaultUpgraderProvider imp
                 currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
             case 2:
                 currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
-
+            case 3:
+                currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader());
             case CURRENT_CONFIG_VERSION:
                 currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
                 break;
@@ -263,4 +265,49 @@ public class DefaultUpgraderProvider imp
             }
         }
 
+    /*
+     * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum
+     * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER"
+     * ensure OWNER is null unless the exclusivity policy is CONTAINER
+     */
+    private class Version3Upgrader extends NonNullUpgrader
+    {
+
+        @Override
+        public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+        {
+            if(Queue.class.getSimpleName().equals(type))
+            {
+                Map<String, Object> newAttributes = new LinkedHashMap<String, Object>(attributes);
+                if(attributes.get(EXCLUSIVE) instanceof Boolean)
+                {
+                    boolean isExclusive = (Boolean) attributes.get(EXCLUSIVE);
+                    newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE");
+                    if(!isExclusive && attributes.containsKey("owner"))
+                    {
+                        newAttributes.remove("owner");
+                    }
+                }
+                else
+                {
+                    newAttributes.remove("owner");
+                }
+                if(!attributes.containsKey("durable"))
+                {
+                    newAttributes.put("durable","true");
+                }
+                attributes = newAttributes;
+                getUpdateMap().put(id, new ConfiguredObjectRecord(id,type,attributes));
+            }
+
+            getNextUpgrader().configuredObject(id,type,attributes);
+        }
+
+        @Override
+        public void complete()
+        {
+            getNextUpgrader().complete();
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java Mon Feb 17 20:48:05 2014
@@ -69,7 +69,7 @@ public class ExchangeRecoverer extends A
             String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
             String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
             boolean autoDelete = lifeTimePolicy == null
-                                 || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+                                 || LifetimePolicy.valueOf(lifeTimePolicy) != LifetimePolicy.PERMANENT;
             try
             {
                 _exchange = _exchangeRegistry.getExchange(id);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java Mon Feb 17 20:48:05 2014
@@ -104,13 +104,6 @@ public class QueueRecoverer extends Abst
         public AMQQueue resolve()
         {
             String queueName = (String) _attributes.get(Queue.NAME);
-            String owner = (String) _attributes.get(Queue.OWNER);
-            boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
-
-            Map<String, Object> queueArgumentsMap = new LinkedHashMap<String, Object>(_attributes);
-            queueArgumentsMap.remove(Queue.NAME);
-            queueArgumentsMap.remove(Queue.OWNER);
-            queueArgumentsMap.remove(Queue.EXCLUSIVE);
 
             try
             {
@@ -122,8 +115,9 @@ public class QueueRecoverer extends Abst
 
                 if (_queue == null)
                 {
-                    _queue = _queueFactory.restoreQueue(_id, queueName, owner, false, exclusive,
-                            false, queueArgumentsMap);
+                    Map<String, Object> attributes = new LinkedHashMap<String, Object>(_attributes);
+                    attributes.put(Queue.ID, _id);
+                    _queue = _queueFactory.restoreQueue(attributes);
                 }
             }
             catch (QpidSecurityException e)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Mon Feb 17 20:48:05 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.QpidSecurityException;
@@ -59,14 +60,7 @@ public interface VirtualHost extends Dur
 
     int removeQueue(AMQQueue queue) throws QpidSecurityException;
 
-    AMQQueue createQueue(UUID id,
-                         String queueName,
-                         boolean durable,
-                         String owner,
-                         boolean autoDelete,
-                         boolean exclusive,
-                         boolean deleteOnNoConsumer,
-                         Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
+    AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments) throws QueueExistsException, QpidSecurityException;
 
 
     Exchange createExchange(UUID id,
@@ -130,4 +124,20 @@ public interface VirtualHost extends Dur
     public void block();
 
     public void unblock();
+
+    long getDefaultAlertThresholdMessageAge();
+
+    long getDefaultAlertThresholdMessageSize();
+
+    long getDefaultAlertThresholdQueueDepthMessages();
+
+    long getDefaultAlertThresholdQueueDepthBytes();
+
+    long getDefaultAlertRepeatGap();
+
+    long getDefaultQueueFlowControlSizeBytes();
+
+    long getDefaultQueueFlowResumeSizeBytes();
+
+    int getDefaultMaximumDeliveryAttempts();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java Mon Feb 17 20:48:05 2014
@@ -137,9 +137,6 @@ public class QueueConfigurationTest exte
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumMessageAge());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumMessageAge());
     }
 
     public void testGetMaximumQueueDepth() throws ConfigurationException
@@ -153,9 +150,6 @@ public class QueueConfigurationTest exte
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumQueueDepth());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumQueueDepth());
     }
 
     public void testGetMaximumMessageSize() throws ConfigurationException
@@ -169,9 +163,6 @@ public class QueueConfigurationTest exte
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumMessageSize());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumMessageSize());
     }
 
     public void testGetMaximumMessageCount() throws ConfigurationException
@@ -185,22 +176,11 @@ public class QueueConfigurationTest exte
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMaximumMessageCount());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMaximumMessageCount());
     }
 
     public void testGetMinimumAlertRepeatGap() throws Exception
     {
-        // set broker attribute ALERT_REPEAT_GAP to 10
-        when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(10);
-
-        // check that broker level setting is available on queue configuration
         QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
-        assertEquals(10, qConf.getMinimumAlertRepeatGap());
-
-        // remove configuration for ALERT_REPEAT_GAP on broker level
-        when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(null);
 
         // Check default value
         qConf = new QueueConfiguration("test", _emptyConf);
@@ -211,9 +191,6 @@ public class QueueConfigurationTest exte
         qConf = new QueueConfiguration("test", vhostConfig);
         assertEquals(2, qConf.getMinimumAlertRepeatGap());
 
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMinimumAlertRepeatGap());
     }
 
     public void testSortQueueConfiguration() throws ConfigurationException

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Mon Feb 17 20:48:05 2014
@@ -35,8 +35,10 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -333,14 +335,26 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public int compareTo(AMQSessionModel o)
+        public void close(AMQConstant cause, String message)
         {
-            return getId().compareTo(o.getId());
         }
 
         @Override
-        public void close(AMQConstant cause, String message)
+        public void addDeleteTask(final Action task)
         {
+
+        }
+
+        @Override
+        public void removeDeleteTask(final Action task)
+        {
+
+        }
+
+        @Override
+        public int compareTo(final Object o)
+        {
+            return 0;
         }
     }
 
@@ -431,12 +445,6 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public String getUserName()
-        {
-            return null;
-        }
-
-        @Override
         public boolean isSessionNameUnique(byte[] name)
         {
             return false;
@@ -455,6 +463,12 @@ public class MockConsumer implements Con
         }
 
         @Override
+        public String getRemoteContainerName()
+        {
+            return null;
+        }
+
+        @Override
         public String getClientVersion()
         {
             return null;
@@ -467,7 +481,7 @@ public class MockConsumer implements Con
         }
 
         @Override
-        public String getPrincipalAsString()
+        public Principal getAuthorizedPrincipal()
         {
             return null;
         }
@@ -512,5 +526,17 @@ public class MockConsumer implements Con
         {
             return null;
         }
+
+        @Override
+        public void addDeleteTask(final Action task)
+        {
+
+        }
+
+        @Override
+        public void removeDeleteTask(final Action task)
+        {
+
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Mon Feb 17 20:48:05 2014
@@ -20,17 +20,23 @@
  */
 package org.apache.qpid.server.exchange;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import junit.framework.Assert;
 
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -70,10 +76,17 @@ public class TopicExchangeTest extends Q
         }
     }
 
+    private AMQQueue<?,?,?> createQueue(String name) throws QpidSecurityException, QueueExistsException
+    {
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+        attributes.put(Queue.NAME, name);
+        return _vhost.createQueue(null, attributes);
+    }
+
     public void testNoRoute() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a*#b");
         _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
 
 
@@ -84,8 +97,7 @@ public class TopicExchangeTest extends Q
 
     public void testDirectMatch() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("ab");
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
 
@@ -107,7 +119,7 @@ public class TopicExchangeTest extends Q
 
     public void testStarMatch() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+        AMQQueue<?,?,?> queue = createQueue("a*");
         _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
 
 
@@ -138,7 +150,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashMatch() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
 
 
@@ -189,8 +201,7 @@ public class TopicExchangeTest extends Q
 
     public void testMidHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
 
         routeMessage("a.c.d.b",0l);
@@ -215,8 +226,7 @@ public class TopicExchangeTest extends Q
 
     public void testMatchAfterHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
 
 
@@ -254,8 +264,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashAfterHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
 
         int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -276,8 +285,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashHash() throws Exception
     {
-        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
-                false, null);
+        AMQQueue<?,?,?> queue = createQueue("a#");
         _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
 
         int queueCount = routeMessage("a.c.b.b.c",0l);
@@ -298,8 +306,7 @@ public class TopicExchangeTest extends Q
 
     public void testSubMatchFails() throws Exception
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null));
 
         int queueCount = routeMessage("a.b.c",0l);
@@ -328,8 +335,7 @@ public class TopicExchangeTest extends Q
 
     public void testMoreRouting() throws Exception
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
 
@@ -342,8 +348,7 @@ public class TopicExchangeTest extends Q
 
     public void testMoreQueue() throws Exception
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
-                false, null);
+        AMQQueue queue = createQueue("a");
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java?rev=1569109&r1=1569108&r2=1569109&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java Mon Feb 17 20:48:05 2014
@@ -22,6 +22,8 @@ package org.apache.qpid.server.logging.s
 
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 
+import java.security.Principal;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -43,9 +45,12 @@ public class ConnectionLogSubjectTest ex
     {
         super.setUp();
 
+        final Principal principal = mock(Principal.class);
+        when(principal.getName()).thenReturn(USER);
+
         _connection = mock(AMQConnectionModel.class);
         when(_connection.getConnectionId()).thenReturn(CONNECTION_ID);
-        when(_connection.getPrincipalAsString()).thenReturn(USER);
+        when(_connection.getAuthorizedPrincipal()).thenReturn(principal);
         when(_connection.getRemoteAddressString()).thenReturn("/"+IP_STRING);
         when(_connection.getVirtualHostName()).thenReturn(VHOST);
         _subject = new ConnectionLogSubject(_connection);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org