You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/17 21:19:38 UTC

svn commit: r1569102 [4/5] - in /qpid/branches/java-broker-amqp-1-0-management/java: amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/src/test/java/org/ap...

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Mon Feb 17 20:19:36 2014
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.times;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -40,6 +41,7 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -143,7 +145,7 @@ public abstract class AbstractDurableCon
         verify(_recoveryHandler).configuredObject(eq(_exchangeId), eq(EXCHANGE),
                 eq(map( org.apache.qpid.server.model.Exchange.NAME, getName(),
                         org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
-                        org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.AUTO_DELETE.toString())));
+                        org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
     }
 
     private Map<String,Object> map(Object... vals)
@@ -220,7 +222,7 @@ public abstract class AbstractDurableCon
         Map<String, Object> queueAttributes = new HashMap<String, Object>();
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+        queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
     }
 
@@ -240,7 +242,7 @@ public abstract class AbstractDurableCon
 
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+        queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.putAll(attributes);
 
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -258,7 +260,7 @@ public abstract class AbstractDurableCon
         Map<String, Object> queueAttributes = new HashMap<String, Object>();
         queueAttributes.put(Queue.NAME, getName());
         queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.TRUE);
+        queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -292,8 +294,6 @@ public abstract class AbstractDurableCon
         Map<String,Object> queueAttributes = new HashMap<String, Object>();
 
         queueAttributes.put(Queue.NAME, getName());
-        queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
         queueAttributes.putAll(attributes);
 
         verify(_recoveryHandler).configuredObject(eq(_queueId), eq(QUEUE), eq(queueAttributes));
@@ -320,8 +320,6 @@ public abstract class AbstractDurableCon
         Map<String,Object> queueAttributes = new HashMap<String, Object>();
 
         queueAttributes.put(Queue.NAME, getName());
-        queueAttributes.put(Queue.OWNER, getName()+"Owner");
-        queueAttributes.put(Queue.EXCLUSIVE, Boolean.FALSE);
         queueAttributes.putAll(attributes);
         queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
 
@@ -361,13 +359,19 @@ public abstract class AbstractDurableCon
     {
         AMQQueue queue = mock(AMQQueue.class);
         when(queue.getName()).thenReturn(queueName);
-        when(queue.getOwner()).thenReturn(queueOwner);
         when(queue.isExclusive()).thenReturn(exclusive);
         when(queue.getId()).thenReturn(_queueId);
         when(queue.getAlternateExchange()).thenReturn(alternateExchange);
-        if(arguments != null && !arguments.isEmpty())
+        final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
+        attributes.put(Queue.NAME, queueName);
+        if(exclusive)
         {
-            when(queue.getAvailableAttributes()).thenReturn(arguments.keySet());
+            when(queue.getOwner()).thenReturn(queueOwner);
+
+            attributes.put(Queue.OWNER, queueOwner);
+            attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER);
+        }
+            when(queue.getAvailableAttributes()).thenReturn(attributes.keySet());
             final ArgumentCaptor<String> requestedAttribute = ArgumentCaptor.forClass(String.class);
             when(queue.getAttribute(requestedAttribute.capture())).then(
                     new Answer()
@@ -377,10 +381,9 @@ public abstract class AbstractDurableCon
                         public Object answer(final InvocationOnMock invocation) throws Throwable
                         {
                             String attrName = requestedAttribute.getValue();
-                            return arguments.get(attrName);
+                            return attributes.get(attrName);
                         }
                     });
-        }
 
         return queue;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Mon Feb 17 20:19:36 2014
@@ -25,11 +25,13 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.net.SocketAddress;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -182,8 +184,10 @@ public class BrokerTestHelper
     public static AMQQueue createQueue(String queueName, VirtualHost virtualHost)
             throws QpidSecurityException, QueueExistsException
     {
-        AMQQueue queue = virtualHost.createQueue(UUIDGenerator.generateRandomUUID(), queueName, false, null,
-                false, false, false, Collections.<String, Object>emptyMap());
+        Map<String,Object> attributes = new HashMap<String, Object>();
+        attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
+        attributes.put(Queue.NAME, queueName);
+        AMQQueue queue = virtualHost.createQueue(null, attributes);
         return queue;
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java Mon Feb 17 20:19:36 2014
@@ -46,6 +46,7 @@ import org.apache.qpid.server.store.Conf
 import org.apache.qpid.server.store.DurableConfigurationRecoverer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
@@ -117,14 +118,11 @@ public class DurableConfigurationRecover
 
 
 
-        final ArgumentCaptor<UUID> idArg = ArgumentCaptor.forClass(UUID.class);
-        final ArgumentCaptor<String> queueArg = ArgumentCaptor.forClass(String.class);
-        final ArgumentCaptor<Map> argsArg = ArgumentCaptor.forClass(Map.class);
+        final ArgumentCaptor<Map> attributesArg = ArgumentCaptor.forClass(Map.class);
 
         _queueFactory = mock(QueueFactory.class);
 
-        when(_queueFactory.restoreQueue(idArg.capture(), queueArg.capture(),
-                anyString(), anyBoolean(), anyBoolean(), anyBoolean(), argsArg.capture())).then(
+        when(_queueFactory.restoreQueue(attributesArg.capture())).then(
                 new Answer()
                 {
 
@@ -133,8 +131,9 @@ public class DurableConfigurationRecover
                     {
                         final AMQQueue queue = mock(AMQQueue.class);
 
-                        final String queueName = queueArg.getValue();
-                        final UUID queueId = idArg.getValue();
+                        final Map attributes = attributesArg.getValue();
+                        final String queueName = (String) attributes.get(Queue.NAME);
+                        final UUID queueId = MapValueConverter.getUUIDAttribute(Queue.ID, attributes);
 
                         when(queue.getName()).thenReturn(queueName);
                         when(queue.getId()).thenReturn(queueId);
@@ -153,10 +152,10 @@ public class DurableConfigurationRecover
                                         return null;
                                     }
                                 }
-                        ).when(queue).setAlternateExchange(altExchangeArg.capture());
+                                ).when(queue).setAlternateExchange(altExchangeArg.capture());
 
-                        Map args = argsArg.getValue();
-                        if(args.containsKey(Queue.ALTERNATE_EXCHANGE))
+                        Map args = attributes;
+                        if (args.containsKey(Queue.ALTERNATE_EXCHANGE))
                         {
                             final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
                             final Exchange exchange = _exchangeRegistry.getExchange(exchangeId);
@@ -470,7 +469,6 @@ public class DurableConfigurationRecover
         {
             queue.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeId.toString());
         }
-        queue.put(Queue.EXCLUSIVE, false);
 
         return queue;
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Mon Feb 17 20:19:36 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
@@ -152,14 +153,7 @@ public class MockVirtualHost implements 
     }
 
     @Override
-    public AMQQueue createQueue(UUID id,
-                                String queueName,
-                                boolean durable,
-                                String owner,
-                                boolean autoDelete,
-                                boolean exclusive,
-                                boolean deleteOnNoConsumer,
-                                Map<String, Object> arguments)
+    public AMQQueue createQueue(final AMQSessionModel creatingSession, Map<String, Object> arguments)
     {
         return null;
     }
@@ -314,4 +308,52 @@ public class MockVirtualHost implements 
     public void unblock()
     {
     }
+
+    @Override
+    public long getDefaultAlertThresholdMessageAge()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertThresholdMessageSize()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthMessages()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertThresholdQueueDepthBytes()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultAlertRepeatGap()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultQueueFlowControlSizeBytes()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getDefaultQueueFlowResumeSizeBytes()
+    {
+        return 0;
+    }
+
+    @Override
+    public int getDefaultMaximumDeliveryAttempts()
+    {
+        return 0;
+    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Mon Feb 17 20:19:36 2014
@@ -25,6 +25,7 @@ import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.security.auth.Subject;
@@ -42,6 +43,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
@@ -56,7 +58,8 @@ import static org.apache.qpid.server.log
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
 
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
+                                                            LogSubject, AuthorizationHolder
 {
     private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -72,6 +75,10 @@ public class ServerConnection extends Co
     private AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
     private Transport _transport;
+
+    private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList =
+            new CopyOnWriteArrayList<Action<? super ServerConnection>>();
+
     private volatile boolean _stopped;
 
     public ServerConnection(final long connectionId, Broker broker)
@@ -197,7 +204,7 @@ public class ServerConnection extends Co
         _onOpenTask = task;
     }
 
-    public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+    public void closeSession(ServerSession session, AMQConstant cause, String message)
     {
         ExecutionException ex = new ExecutionException();
         ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
@@ -211,7 +218,7 @@ public class ServerConnection extends Co
         }
         ex.setErrorCode(code);
         ex.setDescription(message);
-        ((ServerSession)session).invoke(ex);
+        session.invoke(ex);
 
         session.close(cause, message);
     }
@@ -315,6 +322,7 @@ public class ServerConnection extends Co
     public void close(AMQConstant cause, String message)
     {
         closeSubscriptions();
+        performDeleteTasks();
         ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
         try
         {
@@ -327,6 +335,14 @@ public class ServerConnection extends Co
         close(replyCode, message);
     }
 
+    protected void performDeleteTasks()
+    {
+        for(Action<? super ServerConnection> task : _taskList)
+        {
+            task.performAction(this);
+        }
+    }
+
     public synchronized void block()
     {
         if(!_blocking)
@@ -367,12 +383,12 @@ public class ServerConnection extends Co
         super.removeSession(ssn);
     }
 
-    public List<AMQSessionModel> getSessionModels()
+    public List<ServerSession> getSessionModels()
     {
-        List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
+        List<ServerSession> sessions = new ArrayList<ServerSession>();
         for (Session ssn : getChannels())
         {
-            sessions.add((AMQSessionModel) ssn);
+            sessions.add((ServerSession) ssn);
         }
         return sessions;
     }
@@ -475,14 +491,10 @@ public class ServerConnection extends Co
         return String.valueOf(getRemoteAddress());
     }
 
-    public String getUserName()
-    {
-        return _authorizedPrincipal.getName();
-    }
-
     @Override
     public void closed()
     {
+        performDeleteTasks();
         closeSubscriptions();
         super.closed();
     }
@@ -522,6 +534,12 @@ public class ServerConnection extends Co
     }
 
     @Override
+    public String getRemoteContainerName()
+    {
+        return getConnectionDelegate().getClientId();
+    }
+
+    @Override
     public String getClientVersion()
     {
         return getConnectionDelegate().getClientVersion();
@@ -533,11 +551,6 @@ public class ServerConnection extends Co
         return getConnectionDelegate().getClientProduct();
     }
 
-    public String getPrincipalAsString()
-    {
-        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
-    }
-
     public long getSessionCountLimit()
     {
         return getChannelMax();
@@ -565,4 +578,16 @@ public class ServerConnection extends Co
         super.doHeartBeat();
 
     }
+
+    @Override
+    public void addDeleteTask(final Action<? super ServerConnection> task)
+    {
+        _taskList.add(task);
+    }
+
+    @Override
+    public void removeDeleteTask(final Action<? super ServerConnection> task)
+    {
+        _taskList.remove(task);
+    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Mon Feb 17 20:19:36 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -310,14 +311,18 @@ public class ServerConnectionDelegate ex
     private boolean isSessionNameUnique(final byte[] name, final Connection conn)
     {
         final ServerConnection sconn = (ServerConnection) conn;
-        final String userId = sconn.getUserName();
+        final Principal authorizedPrincipal = sconn.getAuthorizedPrincipal();
+        final String userId = authorizedPrincipal == null ? "" : authorizedPrincipal.getName();
 
         final Iterator<AMQConnectionModel> connections =
                         ((ServerConnection)conn).getVirtualHost().getConnectionRegistry().getConnections().iterator();
         while(connections.hasNext())
         {
-            final AMQConnectionModel amqConnectionModel = (AMQConnectionModel) connections.next();
-            if (userId.equals(amqConnectionModel.getUserName()) && !amqConnectionModel.isSessionNameUnique(name))
+            final AMQConnectionModel amqConnectionModel = connections.next();
+            final String userName = amqConnectionModel.getAuthorizedPrincipal() == null
+                    ? ""
+                    : amqConnectionModel.getAuthorizedPrincipal().getName();
+            if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name))
             {
                 return false;
             }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Mon Feb 17 20:19:36 2014
@@ -78,6 +78,7 @@ import org.apache.qpid.server.txn.Suspen
 import org.apache.qpid.server.txn.TimeoutDtxException;
 import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.Deletable;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 import org.slf4j.Logger;
@@ -88,7 +89,9 @@ import static org.apache.qpid.util.Seria
 
 public class ServerSession extends Session
         implements AuthorizationHolder,
-                   AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
+                   AMQSessionModel<ServerSession,ServerConnection>, LogSubject, AsyncAutoCommitTransaction.FutureRecorder,
+                   Deletable<ServerSession>
+
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
 
@@ -132,7 +135,7 @@ public class ServerSession extends Sessi
 
     private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
 
-    private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
+    private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>();
 
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
 
@@ -374,7 +377,7 @@ public class ServerSession extends Sessi
         }
         _messageDispositionListenerMap.clear();
 
-        for (Action<ServerSession> task : _taskList)
+        for (Action<? super ServerSession> task : _taskList)
         {
             task.performAction(this);
         }
@@ -610,12 +613,12 @@ public class ServerSession extends Sessi
         return getConnection().getAuthorizedSubject();
     }
 
-    public void addSessionCloseTask(Action<ServerSession> task)
+    public void addDeleteTask(Action<? super ServerSession> task)
     {
         _taskList.add(task);
     }
 
-    public void removeSessionCloseTask(Action<ServerSession> task)
+    public void removeDeleteTask(Action<? super ServerSession> task)
     {
         _taskList.remove(task);
     }
@@ -652,7 +655,7 @@ public class ServerSession extends Sessi
         return _id;
     }
 
-    public AMQConnectionModel getConnectionModel()
+    public ServerConnection getConnectionModel()
     {
         return getConnection();
     }
@@ -922,7 +925,7 @@ public class ServerSession extends Sessi
     }
 
     @Override
-    public int compareTo(AMQSessionModel o)
+    public int compareTo(ServerSession o)
     {
         return getId().compareTo(o.getId());
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Feb 17 20:19:36 2014
@@ -25,6 +25,8 @@ import java.util.LinkedHashMap;
 import java.util.UUID;
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
 import org.apache.qpid.server.exchange.Exchange;
@@ -204,47 +206,12 @@ public class ServerSessionDelegate exten
                 {
                     exception(session,method,ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                 }
-                else if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
-                {
-                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
-                }
-                else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+                else if(!queue.verifySessionAccess((ServerSession)session))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
                 else
                 {
-                    if(queue.isExclusive())
-                    {
-                        ServerSession s = (ServerSession) session;
-                        queue.setExclusiveOwningSession(s);
-
-                        ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
-                        {
-                            public void performAction(ServerSession session)
-                            {
-                                if(queue.getExclusiveOwningSession() == session)
-                                {
-                                    queue.setExclusiveOwningSession(null);
-                                }
-                            }
-                        });
-
-                        if(queue.getAuthorizationHolder() == null)
-                        {
-                            queue.setAuthorizationHolder(s);
-                            ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
-                            {
-                                public void performAction(ServerSession session)
-                                {
-                                    if(queue.getAuthorizationHolder() == session)
-                                    {
-                                        queue.setAuthorizationHolder(null);
-                                    }
-                                }
-                            });
-                        }
-                    }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
 
@@ -302,6 +269,10 @@ public class ServerSessionDelegate exten
                     {
                         exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
                     }
+                    catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+                    {
+                        exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
+                    }
                 }
             }
         }
@@ -1197,7 +1168,7 @@ public class ServerSessionDelegate exten
                 exception(session, method, errorCode, description);
 
             }
-            else if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+            else if (!queue.verifySessionAccess((ServerSession)session))
             {
                 String description = "Cannot declare queue('" + queueName + "'),"
                                                                        + " as exclusive queue with same name "
@@ -1214,7 +1185,6 @@ public class ServerSessionDelegate exten
             try
             {
 
-                String owner = method.getExclusive() ? ((ServerSession)session).getClientID() : null;
                 final String alternateExchangeName = method.getAlternateExchange();
 
 
@@ -1227,66 +1197,36 @@ public class ServerSessionDelegate exten
 
                 final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
 
-                final boolean deleteOnNoConsumer = !exclusive && autoDelete;
+                arguments.put(Queue.ID, id);
+                arguments.put(Queue.NAME, queueName);
 
-                queue = virtualHost.createQueue(id, queueName, method.getDurable(), owner,
-                                                autoDelete, exclusive, deleteOnNoConsumer,
-                                                arguments);
-
-                if (autoDelete && exclusive)
+                LifetimePolicy lifetime;
+                if(autoDelete)
                 {
-                    final AMQQueue q = queue;
-                    final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
-                        {
-                            public void performAction(ServerSession session)
-                            {
-                                try
-                                {
-                                    virtualHost.removeQueue(q);
-                                }
-                                catch (QpidSecurityException e)
-                                {
-                                    exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
-                                }
-                            }
-                        };
-                    final ServerSession s = (ServerSession) session;
-                    s.addSessionCloseTask(deleteQueueTask);
-                    queue.addQueueDeleteTask(new Action<AMQQueue>()
-                        {
-                            public void performAction(AMQQueue queue)
-                            {
-                                s.removeSessionCloseTask(deleteQueueTask);
-                            }
-                        });
+                    lifetime = exclusive ? LifetimePolicy.DELETE_ON_SESSION_END
+                            : LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS;
                 }
-                if (exclusive)
+                else
                 {
-                    final AMQQueue q = queue;
-                    final Action<ServerSession> removeExclusive = new Action<ServerSession>()
-                    {
-                        public void performAction(ServerSession session)
-                        {
-                            q.setAuthorizationHolder(null);
-                            q.setExclusiveOwningSession(null);
-                        }
-                    };
-                    final ServerSession s = (ServerSession) session;
-                    q.setExclusiveOwningSession(s);
-                    s.addSessionCloseTask(removeExclusive);
-                    queue.addQueueDeleteTask(new Action<AMQQueue>()
-                    {
-                        public void performAction(AMQQueue queue)
-                        {
-                            s.removeSessionCloseTask(removeExclusive);
-                        }
-                    });
+                    lifetime = LifetimePolicy.PERMANENT;
                 }
+
+                arguments.put(Queue.LIFETIME_POLICY, lifetime);
+
+                ExclusivityPolicy exclusivityPolicy = exclusive ? ExclusivityPolicy.SESSION : ExclusivityPolicy.NONE;
+
+
+                arguments.put(Queue.DURABLE, method.getDurable());
+
+                arguments.put(Queue.EXCLUSIVE, exclusivityPolicy);
+
+                queue = virtualHost.createQueue((ServerSession)session, arguments);
+
             }
             catch(QueueExistsException qe)
             {
                 queue = qe.getExistingQueue();
-                if (exclusive && (queue.getExclusiveOwningSession() != null && !queue.getExclusiveOwningSession().equals(session)))
+                if (!queue.verifySessionAccess((ServerSession)session))
                 {
                     String description = "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
@@ -1347,11 +1287,7 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                if(queue.getAuthorizationHolder() != null && queue.getAuthorizationHolder() != session)
-                {
-                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
-                }
-                else if(queue.isExclusive() && queue.getExclusiveOwningSession()  != null && queue.getExclusiveOwningSession() != session)
+                if(!queue.verifySessionAccess((ServerSession)session))
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
@@ -1424,7 +1360,7 @@ public class ServerSessionDelegate exten
             result.setQueue(queue.getName());
             result.setDurable(queue.isDurable());
             result.setExclusive(queue.isExclusive());
-            result.setAutoDelete(queue.isAutoDelete());
+            result.setAutoDelete(queue.getLifetimePolicy() != LifetimePolicy.PERMANENT);
             Map<String, Object> arguments = new LinkedHashMap<String, Object>();
             Collection<String> availableAttrs = queue.getAvailableAttributes();
 
@@ -1500,7 +1436,6 @@ public class ServerSessionDelegate exten
     public void closed(Session session)
     {
         setThreadSubject(session);
-
         ServerSession serverSession = (ServerSession)session;
 
         serverSession.stopSubscriptions();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Feb 17 20:19:36 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -30,6 +31,8 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQMethodBody;
@@ -86,7 +89,9 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
 
-public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel<T extends AMQProtocolSession<T>>
+        implements AMQSessionModel<AMQChannel<T>,T>,
+                   AsyncAutoCommitTransaction.FutureRecorder
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
@@ -140,7 +145,7 @@ public class AMQChannel implements AMQSe
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
 
-    private final AMQProtocolSession _session;
+    private final T _session;
     private AtomicBoolean _closing = new AtomicBoolean(false);
 
     private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -163,12 +168,15 @@ public class AMQChannel implements AMQSe
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
     private final UUID _id = UUID.randomUUID();
 
+    private final List<Action<? super AMQChannel<T>>> _taskList =
+            new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+
 
     private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
     private final ImmediateAction _immediateAction = new ImmediateAction();
 
 
-    public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
+    public AMQChannel(T session, int channelId, MessageStore messageStore)
             throws AMQException
     {
         _session = session;
@@ -526,7 +534,8 @@ public class AMQChannel implements AMQSe
     public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
                                             FieldTable filters, boolean exclusive, boolean noLocal)
             throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException
+                   MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
+                   MessageSource.ConsumerAccessRefused
     {
         if (tag == null)
         {
@@ -580,7 +589,15 @@ public class AMQChannel implements AMQSe
                 {
                     filterManager = new SimpleFilterManager();
                 }
-                filterManager.add(new FilterSupport.NoLocalFilter(source));
+                final Object connectionReference = getConnectionReference();
+                filterManager.add(new MessageFilter()
+                {
+                    @Override
+                    public boolean matches(final Filterable message)
+                    {
+                        return message.getConnectionReference() != connectionReference;
+                    }
+                });
             }
             Consumer sub =
                     source.addConsumer(target,
@@ -609,6 +626,11 @@ public class AMQChannel implements AMQSe
             _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
+        catch (MessageSource.ConsumerAccessRefused e)
+        {
+            _tag2SubscriptionTargetMap.remove(tag);
+            throw e;
+        }
         return tag;
     }
 
@@ -657,6 +679,13 @@ public class AMQChannel implements AMQSe
         CurrentActor.get().message(_logSubject, operationalLogMessage);
 
         unsubscribeAllConsumers();
+
+        for (Action<? super AMQChannel<T>> task : _taskList)
+        {
+            task.performAction(this);
+        }
+
+
         _transaction.rollback();
 
         try
@@ -692,9 +721,10 @@ public class AMQChannel implements AMQSe
 
             Consumer sub = me.getValue().getConsumer();
 
-
-            sub.close();
-
+            if(sub != null)
+            {
+                sub.close();
+            }
         }
 
         _tag2SubscriptionTargetMap.clear();
@@ -1192,7 +1222,8 @@ public class AMQChannel implements AMQSe
         return _id;
     }
 
-    public AMQConnectionModel getConnectionModel()
+    @Override
+    public T getConnectionModel()
     {
         return _session;
     }
@@ -1208,11 +1239,23 @@ public class AMQChannel implements AMQSe
     }
 
     @Override
-    public int compareTo(AMQSessionModel o)
+    public int compareTo(AMQChannel o)
     {
         return getId().compareTo(o.getId());
     }
 
+    @Override
+    public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+    {
+        _taskList.add(task);
+    }
+
+    @Override
+    public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+    {
+        _taskList.remove(task);
+    }
+
 
     private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
     {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Mon Feb 17 20:19:36 2014
@@ -33,7 +33,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -47,18 +46,15 @@ import org.apache.qpid.AMQChannelExcepti
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.logging.LogActor;
@@ -78,6 +74,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -86,7 +83,7 @@ import org.apache.qpid.transport.Transpo
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
 
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -103,9 +100,11 @@ public class AMQProtocolEngine implement
 
     private VirtualHost _virtualHost;
 
-    private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
+    private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
+            new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
 
-    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+    @SuppressWarnings("unchecked")
+    private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
 
     /**
      * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -114,9 +113,8 @@ public class AMQProtocolEngine implement
      *
      * Thread-safety: guarded by {@link #_receivedLock}.
      */
-    private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<AMQChannel>();
-
-    private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
+    private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
+            new HashSet<AMQChannel<AMQProtocolEngine>>();
 
     private final AMQStateManager _stateManager;
 
@@ -124,10 +122,6 @@ public class AMQProtocolEngine implement
 
     private SaslServer _saslServer;
 
-    private Object _lastReceived;
-
-    private Object _lastSent;
-
     private volatile boolean _closed;
 
     // maximum number of channels this session should have
@@ -136,8 +130,8 @@ public class AMQProtocolEngine implement
     /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
     private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
-    private FieldTable _clientProperties;
-    private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+    private final List<Action<? super AMQProtocolEngine>> _taskList =
+            new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
 
     private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
@@ -153,12 +147,9 @@ public class AMQProtocolEngine implement
     private long _lastIoTime;
 
     private long _writtenBytes;
-    private long _readBytes;
-
 
     private long _maxFrameSize;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
-    private long _createTime = System.currentTimeMillis();
 
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
@@ -176,6 +167,7 @@ public class AMQProtocolEngine implement
 
     private volatile boolean _closeWhenNoRoute;
     private volatile boolean _stopped;
+    private long _readBytes;
 
     public AMQProtocolEngine(Broker broker,
                              NetworkConnection network,
@@ -258,15 +250,14 @@ public class AMQProtocolEngine implement
         final long arrivalTime = System.currentTimeMillis();
         _lastReceivedTime = arrivalTime;
         _lastIoTime = arrivalTime;
+        _readBytes += msg.remaining();
 
         _receivedLock.lock();
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
-            final int len = dataBlocks.size();
-            for (int i = 0; i < len; i++)
+            for (AMQDataBlock dataBlock : dataBlocks)
             {
-                AMQDataBlock dataBlock = dataBlocks.get(i);
                 try
                 {
                     dataBlockReceived(dataBlock);
@@ -316,7 +307,7 @@ public class AMQProtocolEngine implement
 
     private void receivedComplete()
     {
-        for (AMQChannel channel : _channelsForCurrentMessage)
+        for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
         {
             channel.receivedComplete();
         }
@@ -334,7 +325,6 @@ public class AMQProtocolEngine implement
      */
     private void dataBlockReceived(AMQDataBlock message) throws Exception
     {
-        _lastReceived = message;
         if (message instanceof ProtocolInitiation)
         {
             protocolInitiationReceived((ProtocolInitiation) message);
@@ -363,7 +353,7 @@ public class AMQProtocolEngine implement
     private void frameReceived(AMQFrame frame) throws AMQException
     {
         int channelId = frame.getChannel();
-        AMQChannel amqChannel = _channelMap.get(channelId);
+        AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
         if(amqChannel != null)
         {
             // The _receivedLock is already acquired in the caller
@@ -558,14 +548,6 @@ public class AMQProtocolEngine implement
             {
                 boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
 
-                if (!_frameListeners.isEmpty())
-                {
-                    for (AMQMethodListener listener : _frameListeners)
-                    {
-                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-                    }
-                }
-
                 if (!wasAnyoneInterested)
                 {
                     throw new AMQNoMethodHandlerException(evt);
@@ -611,11 +593,6 @@ public class AMQProtocolEngine implement
         }
         catch (Exception e)
         {
-            for (AMQMethodListener listener : _frameListeners)
-            {
-                listener.error(e);
-            }
-
             _logger.error("Unexpected exception while processing frame.  Closing connection.", e);
 
             closeProtocolSession();
@@ -625,7 +602,7 @@ public class AMQProtocolEngine implement
     public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
     {
 
-        AMQChannel channel = getAndAssertChannel(channelId);
+        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
 
         channel.publishContentHeader(body);
 
@@ -633,7 +610,7 @@ public class AMQProtocolEngine implement
 
     public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
     {
-        AMQChannel channel = getAndAssertChannel(channelId);
+        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
 
         channel.publishContentBody(body);
     }
@@ -681,17 +658,17 @@ public class AMQProtocolEngine implement
         _contextKey = contextKey;
     }
 
-    public List<AMQChannel> getChannels()
+    public List<AMQChannel<AMQProtocolEngine>> getChannels()
     {
         synchronized (_channelMap)
         {
-            return new ArrayList<AMQChannel>(_channelMap.values());
+            return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
         }
     }
 
-    public AMQChannel getAndAssertChannel(int channelId) throws AMQException
+    public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
     {
-        AMQChannel channel = getChannel(channelId);
+        AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
         if (channel == null)
         {
             throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
@@ -700,9 +677,9 @@ public class AMQProtocolEngine implement
         return channel;
     }
 
-    public AMQChannel getChannel(int channelId)
+    public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
     {
-        final AMQChannel channel =
+        final AMQChannel<AMQProtocolEngine> channel =
                 ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
@@ -719,7 +696,7 @@ public class AMQProtocolEngine implement
         return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
     }
 
-    public void addChannel(AMQChannel channel) throws AMQException
+    public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
     {
         if (_closed)
         {
@@ -770,7 +747,7 @@ public class AMQProtocolEngine implement
         _maxNoOfChannels = value;
     }
 
-    public void commitTransactions(AMQChannel channel) throws AMQException
+    public void commitTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
     {
         if ((channel != null) && channel.isTransactional())
         {
@@ -778,7 +755,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void rollbackTransactions(AMQChannel channel) throws AMQException
+    public void rollbackTransactions(AMQChannel<AMQProtocolEngine> channel) throws AMQException
     {
         if ((channel != null) && channel.isTransactional())
         {
@@ -802,7 +779,7 @@ public class AMQProtocolEngine implement
 
     public void closeChannel(int channelId, AMQConstant cause, String message)
     {
-        final AMQChannel channel = getChannel(channelId);
+        final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
         if (channel == null)
         {
             throw new IllegalArgumentException("Unknown channel id");
@@ -879,12 +856,10 @@ public class AMQProtocolEngine implement
 
     /**
      * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel.
-     *
-     * @throws AMQException if an error occurs while closing any channel
      */
     private void closeAllChannels()
     {
-        for (AMQChannel channel : getChannels())
+        for (AMQChannel<AMQProtocolEngine> channel : getChannels())
         {
             channel.close();
         }
@@ -929,9 +904,9 @@ public class AMQProtocolEngine implement
 
                 closeAllChannels();
 
-                for (Task task : _taskList)
+                for (Action<? super AMQProtocolEngine> task : _taskList)
                 {
-                    task.doTask(this);
+                    task.performAction(this);
                 }
 
                 synchronized(this)
@@ -961,7 +936,7 @@ public class AMQProtocolEngine implement
                     }
                     catch (InterruptedException e)
                     {
-
+                        // do nothing
                     }
                     finally
                     {
@@ -1027,11 +1002,6 @@ public class AMQProtocolEngine implement
         return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")");
     }
 
-    public String dump()
-    {
-        return this + " last_sent=" + _lastSent + " last_received=" + _lastReceived;
-    }
-
     /** @return an object that can be used to identity */
     public Object getKey()
     {
@@ -1069,10 +1039,9 @@ public class AMQProtocolEngine implement
 
     public void setClientProperties(FieldTable clientProperties)
     {
-        _clientProperties = clientProperties;
-        if (_clientProperties != null)
+        if (clientProperties != null)
         {
-            String closeWhenNoRoute = _clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
+            String closeWhenNoRoute = clientProperties.getString(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE);
             if (closeWhenNoRoute != null)
             {
                 _closeWhenNoRoute = Boolean.parseBoolean(closeWhenNoRoute);
@@ -1082,10 +1051,10 @@ public class AMQProtocolEngine implement
                 }
             }
 
-            _clientVersion = _clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
-            _clientProduct = _clientProperties.getString(ConnectionStartProperties.PRODUCT);
+            _clientVersion = clientProperties.getString(ConnectionStartProperties.VERSION_0_8);
+            _clientProduct = clientProperties.getString(ConnectionStartProperties.PRODUCT);
 
-            String clientId = _clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
+            String clientId = clientProperties.getString(ConnectionStartProperties.CLIENT_ID_0_8);
             if (clientId != null)
             {
                 setContextKey(new AMQShortString(clientId));
@@ -1118,11 +1087,6 @@ public class AMQProtocolEngine implement
         return _protocolVersion.getMinorVersion();
     }
 
-    public boolean isProtocolVersion(byte major, byte minor)
-    {
-        return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
-    }
-
     public MethodRegistry getRegistry()
     {
         return getMethodRegistry();
@@ -1141,12 +1105,12 @@ public class AMQProtocolEngine implement
 
     }
 
-    public void addSessionCloseTask(Task task)
+    public void addDeleteTask(Action<? super AMQProtocolEngine> task)
     {
         _taskList.add(task);
     }
 
-    public void removeSessionCloseTask(Task task)
+    public void removeDeleteTask(Action<? super AMQProtocolEngine> task)
     {
         _taskList.remove(task);
     }
@@ -1341,51 +1305,11 @@ public class AMQProtocolEngine implement
         return _clientProduct;
     }
 
-    public String getPrincipalAsString()
-    {
-        return getAuthId();
-    }
-
     public long getSessionCountLimit()
     {
         return getMaximumNumberOfChannels();
     }
 
-    public Boolean isIncoming()
-    {
-        return true;
-    }
-
-    public Boolean isSystemConnection()
-    {
-        return false;
-    }
-
-    public Boolean isFederationLink()
-    {
-        return false;
-    }
-
-    public String getAuthId()
-    {
-        return getAuthorizedPrincipal() == null ? null : getAuthorizedPrincipal().getName();
-    }
-
-    public Integer getRemotePID()
-    {
-        return null;
-    }
-
-    public String getRemoteProcessName()
-    {
-        return null;
-    }
-
-    public Integer getRemoteParentPID()
-    {
-        return null;
-    }
-
     public boolean isDurable()
     {
         return false;
@@ -1401,52 +1325,6 @@ public class AMQProtocolEngine implement
         return String.valueOf(getRemoteAddress());
     }
 
-    public long getCreateTime()
-    {
-        return _createTime;
-    }
-
-    public Boolean isShadow()
-    {
-        return false;
-    }
-
-    public void mgmtClose()
-    {
-        MethodRegistry methodRegistry = getMethodRegistry();
-        ConnectionCloseBody responseBody =
-                methodRegistry.createConnectionCloseBody(
-                        AMQConstant.REPLY_SUCCESS.getCode(),
-                        new AMQShortString("The connection was closed using the broker's management interface."),
-                        0,0);
-
-        // This seems ugly but because we use closeConnection in both normal
-        // broker operation and as part of the management interface it cannot
-        // be avoided. The Current Actor will be null when this method is
-        // called via the QMF management interface. As such we need to set one.
-        boolean removeActor = false;
-        if (CurrentActor.get() == null)
-        {
-            removeActor = true;
-            CurrentActor.set(new ManagementActor(_actor.getRootMessageLogger()));
-        }
-
-        try
-        {
-            writeFrame(responseBody.generateFrame(0));
-
-            closeSession();
-
-        }
-        finally
-        {
-            if (removeActor)
-            {
-                CurrentActor.remove();
-            }
-        }
-    }
-
     public void mgmtCloseChannel(int channelId)
     {
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1481,14 +1359,9 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public String getClientID()
-    {
-        return getContextKey().toString();
-    }
-
-    public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
+    public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
     {
-        int channelId = ((AMQChannel)session).getChannelId();
+        int channelId = session.getChannelId();
         closeChannel(channelId, cause, message);
 
         MethodRegistry methodRegistry = getMethodRegistry();
@@ -1506,7 +1379,7 @@ public class AMQProtocolEngine implement
         closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
 		                getProtocolOutputConverter().getProtocolMajorVersion(),
 		                getProtocolOutputConverter().getProtocolMinorVersion(),
-		                (Throwable) null));
+		                null));
     }
 
     public void block()
@@ -1516,7 +1389,7 @@ public class AMQProtocolEngine implement
             if(!_blocking)
             {
                 _blocking = true;
-                for(AMQChannel channel : _channelMap.values())
+                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
                 {
                     channel.block();
                 }
@@ -1531,7 +1404,7 @@ public class AMQProtocolEngine implement
             if(_blocking)
             {
                 _blocking = false;
-                for(AMQChannel channel : _channelMap.values())
+                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
                 {
                     channel.unblock();
                 }
@@ -1544,9 +1417,9 @@ public class AMQProtocolEngine implement
         return _closed;
     }
 
-    public List<AMQSessionModel> getSessionModels()
+    public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
     {
-		return new ArrayList<AMQSessionModel>(getChannels());
+		return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
     }
 
     public LogSubject getLogSubject()
@@ -1620,14 +1493,15 @@ public class AMQProtocolEngine implement
         return String.valueOf(getContextKey());
     }
 
-    public void setDeferFlush(boolean deferFlush)
+    @Override
+    public String getRemoteContainerName()
     {
-        _deferFlush = deferFlush;
+        return String.valueOf(getContextKey());
     }
 
-    public String getUserName()
+    public void setDeferFlush(boolean deferFlush)
     {
-        return getAuthorizedPrincipal().getName();
+        _deferFlush = deferFlush;
     }
 
     public final class WriteDeliverMethod

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java Mon Feb 17 20:19:36 2014
@@ -37,12 +37,14 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel
+public interface AMQProtocolSession<T extends AMQProtocolSession<T>>
+        extends AMQVersionAwareProtocolSession, AuthorizationHolder, AMQConnectionModel<T,AMQChannel<T>>
 {
     long getSessionID();
 
@@ -69,11 +71,6 @@ public interface AMQProtocolSession exte
      */
     SocketAddress getLocalAddress();
 
-    public static interface Task
-    {
-        public void doTask(AMQProtocolSession session);
-    }
-
     /**
      * Get the context key associated with this session. Context key is described in the AMQ protocol specification (RFC
      * 6).
@@ -98,7 +95,7 @@ public interface AMQProtocolSession exte
      *
      * @return null if no channel exists, the channel otherwise
      */
-    AMQChannel getChannel(int channelId);
+    AMQChannel<T> getChannel(int channelId);
 
     /**
      * Associate a channel with this session.
@@ -106,7 +103,7 @@ public interface AMQProtocolSession exte
      * @param channel the channel to associate with this session. It is an error to associate the same channel with more
      *                than one session but this is not validated.
      */
-    void addChannel(AMQChannel channel) throws AMQException;
+    void addChannel(AMQChannel<T> channel) throws AMQException;
 
     /**
      * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
@@ -185,10 +182,6 @@ public interface AMQProtocolSession exte
 
     void setVirtualHost(VirtualHost virtualHost) throws AMQException;
 
-    void addSessionCloseTask(Task task);
-
-    void removeSessionCloseTask(Task task);
-
     public ProtocolOutputConverter getProtocolOutputConverter();
 
     void setAuthorizedSubject(Subject authorizedSubject);
@@ -209,11 +202,11 @@ public interface AMQProtocolSession exte
 
     void setMaximumNumberOfChannels(Long value);
 
-    void commitTransactions(AMQChannel channel) throws AMQException;
+    void commitTransactions(AMQChannel<T> channel) throws AMQException;
 
-    void rollbackTransactions(AMQChannel channel) throws AMQException;
+    void rollbackTransactions(AMQChannel<T> channel) throws AMQException;
 
-    List<AMQChannel> getChannels();
+    List<AMQChannel<T>> getChannels();
 
     void mgmtCloseChannel(int channelId);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Mon Feb 17 20:19:36 2014
@@ -99,16 +99,6 @@ public class BasicConsumeMethodHandler i
             {
                 final AMQShortString consumerTagName;
 
-                if (queue.isExclusive() && !queue.isDurable())
-                {
-                    AMQSessionModel session = queue.getExclusiveOwningSession();
-                    if (session == null || session.getConnectionModel() != protocolConnection)
-                    {
-                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                          "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
-                    }
-                }
-
                 if (body.getConsumerTag() != null)
                 {
                     consumerTagName = body.getConsumerTag().intern(false);
@@ -184,6 +174,13 @@ public class BasicConsumeMethodHandler i
                                                    + queue.getName()
                                                    + " permission denied");
                 }
+                catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+                {
+                    throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
+                                                   "Cannot subscribe to queue "
+                                                   + queue.getName()
+                                                   + " as it already has an incompatible exclusivity policy");
+                }
 
             }
         }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Mon Feb 17 20:19:36 2014
@@ -98,15 +98,6 @@ public class BasicGetMethodHandler imple
             }
             else
             {
-                if (queue.isExclusive())
-                {
-                    AMQSessionModel session = queue.getExclusiveOwningSession();
-                    if (session == null || session.getConnectionModel() != protocolConnection)
-                    {
-                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                          "Queue is exclusive, but not created on this Connection.");
-                    }
-                }
 
                 try
                 {
@@ -136,6 +127,11 @@ public class BasicGetMethodHandler imple
                                                       "The GET request has been evaluated as an exclusive consumer, " +
                                                       "this is likely due to a programming error in the Qpid broker");
                 }
+                catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "Queue has an incompatible exclusivit policy");
+                }
             }
         }
     }
@@ -145,7 +141,7 @@ public class BasicGetMethodHandler imple
                                      final AMQChannel channel,
                                      final boolean acks)
             throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer
+                   MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
     {
 
         final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java Mon Feb 17 20:19:36 2014
@@ -116,15 +116,6 @@ public class QueueBindHandler implements
 
         try
         {
-            if (queue.isExclusive() && !queue.isDurable())
-            {
-                AMQSessionModel session = queue.getExclusiveOwningSession();
-                if (session == null || session.getConnectionModel() != protocolConnection)
-                {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
-                }
-            }
 
             Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
             String bindingKey = String.valueOf(routingKey);
@@ -144,10 +135,6 @@ public class QueueBindHandler implements
                 }
             }
         }
-        catch (AMQException e)
-        {
-            throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
-        }
         catch (QpidSecurityException e)
         {
             throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Mon Feb 17 20:19:36 2014
@@ -29,6 +29,9 @@ import org.apache.qpid.framing.MethodReg
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeclareOkBody;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -39,7 +42,6 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.security.QpidSecurityException;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
@@ -96,9 +98,7 @@ public class QueueDeclareHandler impleme
             }
             else
             {
-                AMQSessionModel owningSession = queue.getExclusiveOwningSession();
-                if (queue.isExclusive() && !queue.isDurable()
-                    && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+                if (!queue.verifySessionAccess(channel))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -114,42 +114,15 @@ public class QueueDeclareHandler impleme
             try
             {
 
-                queue = createQueue(queueName, body, virtualHost, protocolConnection);
-                queue.setAuthorizationHolder(protocolConnection);
-
-                if (body.getExclusive())
-                {
-                    queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
-                    queue.setAuthorizationHolder(protocolConnection);
-
-                    if(!body.getDurable())
-                    {
-                        final AMQQueue q = queue;
-                        final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
-                        {
-                            public void doTask(AMQProtocolSession session)
-                            {
-                                q.setExclusiveOwningSession(null);
-                            }
-                        };
-                        protocolConnection.addSessionCloseTask(sessionCloseTask);
-                        queue.addQueueDeleteTask(new Action<AMQQueue>() {
-                            public void performAction(AMQQueue queue)
-                            {
-                                protocolConnection.removeSessionCloseTask(sessionCloseTask);
-                            }
-                        });
-                    }
-                }
+                queue = createQueue(channel, queueName, body, virtualHost, protocolConnection);
 
             }
             catch(QueueExistsException qe)
             {
 
                 queue = qe.getExistingQueue();
-                AMQSessionModel owningSession = queue.getExclusiveOwningSession();
 
-                if (queue.isExclusive() && !queue.isDurable() && (owningSession == null || owningSession.getConnectionModel() != protocolConnection))
+                if (!queue.verifySessionAccess(channel))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");
@@ -161,19 +134,12 @@ public class QueueDeclareHandler impleme
                             "Cannot re-declare queue '" + queue.getName() + "' with different exclusivity (was: "
                             + queue.isExclusive() + " requested " + body.getExclusive() + ")");
                 }
-                else if (body.getExclusive() && !(queue.isDurable() ? String.valueOf(queue.getOwner()).equals(session.getClientID()) : (owningSession == null || owningSession.getConnectionModel() == protocolConnection)))
-                {
-                    throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
-                                                                               + "as exclusive queue with same name "
-                                                                               + "declared on another client ID('"
-                                                                               + queue.getOwner() + "') your clientID('" + session.getClientID() + "')");
-
-                }
-                else if(queue.isAutoDelete() != body.getAutoDelete())
+                else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+                    || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
                 {
                     throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
-                                                      "Cannot re-declare queue '" + queue.getName() + "' with different auto-delete (was: "
-                                                        + queue.isAutoDelete() + " requested " + body.getAutoDelete() + ")");
+                                                      "Cannot re-declare queue '" + queue.getName() + "' with different lifetime policy (was: "
+                                                        + queue.getLifetimePolicy() + " requested autodelete: " + body.getAutoDelete() + ")");
                 }
                 else if(queue.isDurable() != body.getDurable())
                 {
@@ -211,7 +177,7 @@ public class QueueDeclareHandler impleme
         return new AMQShortString("tmp_" + UUID.randomUUID());
     }
 
-    protected AMQQueue createQueue(final AMQShortString queueName,
+    protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
                                    QueueDeclareBody body,
                                    final VirtualHost virtualHost,
                                    final AMQProtocolSession session)
@@ -222,47 +188,35 @@ public class QueueDeclareHandler impleme
         final boolean autoDelete = body.getAutoDelete();
         final boolean exclusive = body.getExclusive();
 
-        String owner = exclusive ? AMQShortString.toString(session.getContextKey()) : null;
 
-        Map<String, Object> arguments =
+        Map<String, Object> attributes =
                 QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
-        String queueNameString = AMQShortString.toString(queueName);
-        final UUID id = UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName());
-
-        final AMQQueue queue = virtualHost.createQueue(id, queueNameString, durable, owner, autoDelete,
-                exclusive, autoDelete, arguments);
-
-        if (exclusive && !durable)
+        final String queueNameString = AMQShortString.toString(queueName);
+        attributes.put(Queue.NAME, queueNameString);
+        attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()));
+        attributes.put(Queue.DURABLE, durable);
+
+        LifetimePolicy lifetimePolicy;
+        ExclusivityPolicy exclusivityPolicy;
+
+        if(exclusive)
+        {
+            lifetimePolicy = autoDelete
+                    ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+                    : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+            exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+        }
+        else
         {
-            final AMQProtocolSession.Task deleteQueueTask =
-                    new AMQProtocolSession.Task()
-                    {
-                        public void doTask(AMQProtocolSession session)
-                        {
-                            if (virtualHost.getQueue(queueName.toString()) == queue)
-                            {
-                                try
-                                {
-                                    virtualHost.removeQueue(queue);
-                                }
-                                catch (QpidSecurityException e)
-                                {
-                                    throw new ConnectionScopedRuntimeException("Permission exception: Unable to remove a temporary queue created by a session which has now removed itself", e);
-                                }
-                            }
-                        }
-                    };
+            lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+            exclusivityPolicy = ExclusivityPolicy.NONE;
+        }
 
-            session.addSessionCloseTask(deleteQueueTask);
+        attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+        attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
 
-            queue.addQueueDeleteTask(new Action<AMQQueue>()
-            {
-                public void performAction(AMQQueue queue)
-                {
-                    session.removeSessionCloseTask(deleteQueueTask);
-                }
-            });
-        }
+
+        final AMQQueue queue = virtualHost.createQueue(channel, attributes);
 
         return queue;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java?rev=1569102&r1=1569101&r2=1569102&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeleteHandler.java Mon Feb 17 20:19:36 2014
@@ -105,8 +105,7 @@ public class QueueDeleteHandler implemen
             }
             else
             {
-                AMQSessionModel session = queue.getExclusiveOwningSession();
-                if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
+                if (!queue.verifySessionAccess(channel))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue " + queue.getName() + " is exclusive, but not created on this Connection.");



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org