You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/07 17:57:52 UTC

svn commit: r1565726 [4/6] - in /qpid/trunk/qpid/java: ./ amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrad...

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Fri Feb  7 16:57:49 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.txn;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
@@ -47,7 +48,7 @@ public class AutoCommitTransactionTest e
     private MessageStore _transactionLog;
     private AMQQueue _queue;
     private List<AMQQueue> _queues;
-    private Collection<QueueEntry> _queueEntries;
+    private Collection<MessageInstance> _queueEntries;
     private ServerMessage _message;
     private MockAction _action;
     private MockStoreTransaction _storeTransaction;
@@ -373,9 +374,9 @@ public class AutoCommitTransactionTest e
         assertFalse("Rollback action must be fired",  _action.isRollbackActionFired());
     }  
     
-    private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+    private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
     {
-        Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
         
         assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
         

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Fri Feb  7 16:57:49 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.txn;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
@@ -46,7 +47,7 @@ public class LocalTransactionTest extend
     
     private AMQQueue _queue;
     private List<AMQQueue> _queues;
-    private Collection<QueueEntry> _queueEntries;
+    private Collection<MessageInstance> _queueEntries;
     private ServerMessage _message;
     private MockAction _action1;
     private MockAction _action2;
@@ -597,9 +598,9 @@ public class LocalTransactionTest extend
         assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
     }
 
-    private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+    private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
     {
-        Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
         
         assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
         

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Fri Feb  7 16:57:49 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -127,6 +129,12 @@ public class MockVirtualHost implements 
     }
 
     @Override
+    public MessageSource getMessageSource(final String name)
+    {
+        return null;
+    }
+
+    @Override
     public AMQQueue getQueue(UUID id)
     {
         return null;
@@ -174,6 +182,12 @@ public class MockVirtualHost implements 
     }
 
     @Override
+    public MessageDestination getMessageDestination(final String name)
+    {
+        return null;
+    }
+
+    @Override
     public Exchange getExchange(String name)
     {
         return null;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Fri Feb  7 16:57:49 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
 
 
 class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,21 +30,20 @@ class ExplicitAcceptDispositionChangeLis
     private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
 
 
-    private final QueueEntry _entry;
-    private final Subscription_0_10 _sub;
+    private final MessageInstance _entry;
+    private final ConsumerTarget_0_10 _target;
 
-    public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+    public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
     {
         _entry = entry;
-        _sub = subscription_0_10;
+        _target = target;
     }
 
     public void onAccept()
     {
-        final Subscription_0_10 subscription = getSubscription();
-        if(subscription != null && _entry.isAcquiredBy(_sub))
+        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
         {
-            subscription.getSessionModel().acknowledge(subscription, _entry);
+            _target.getSessionModel().acknowledge(_target, _entry);
         }
         else
         {
@@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        final Subscription_0_10 subscription = getSubscription();
-        if(subscription != null && _entry.isAcquiredBy(_sub))
+        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
         {
-            subscription.release(_entry, setRedelivered);
+            _target.release(_entry, setRedelivered);
         }
         else
         {
@@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        final Subscription_0_10 subscription = getSubscription();
-        if(subscription != null && _entry.isAcquiredBy(_sub))
+        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
         {
-            subscription.reject(_entry);
+            _target.reject(_entry);
         }
         else
         {
@@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        return _entry.acquire(getSubscription());
+        return _entry.acquire(_target.getConsumer());
     }
 
 
-    private Subscription_0_10 getSubscription()
-    {
-        return _sub;
-    }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Fri Feb  7 16:57:49 2014
@@ -22,20 +22,20 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
 
 class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
 {
     private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
 
 
-    private final QueueEntry _entry;
-    private Subscription_0_10 _sub;
+    private final MessageInstance _entry;
+    private ConsumerTarget_0_10 _target;
 
-    public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+    public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
     {
         _entry = entry;
-        _sub = subscription_0_10;
+        _target = target;
     }
 
     public void onAccept()
@@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_entry.isAcquiredBy(_sub))
+        if(_entry.isAcquiredBy(_target.getConsumer()))
         {
-            getSubscription().release(_entry, setRedelivered);
+            _target.release(_entry, setRedelivered);
         }
         else
         {
@@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        if(_entry.isAcquiredBy(_sub))
+        if(_entry.isAcquiredBy(_target.getConsumer()))
         {
-            getSubscription().reject(_entry);
+            _target.reject(_entry);
         }
         else
         {
@@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        boolean acquired = _entry.acquire(getSubscription());
+        boolean acquired = _entry.acquire(_target.getConsumer());
         if(acquired)
         {
-            getSubscription().recordUnacknowledged(_entry);
+            _target.recordUnacknowledged(_entry);
         }
         return acquired;
 
     }
 
-    public Subscription_0_10 getSubscription()
-    {
-        return _sub;
-    }
 
 
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Fri Feb  7 16:57:49 2014
@@ -21,17 +21,17 @@
 
 package org.apache.qpid.server.protocol.v0_10;
 
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.transport.Method;
 
 public class MessageAcceptCompletionListener implements Method.CompletionListener
 {
-    private final Subscription_0_10 _sub;
-    private final QueueEntry _entry;
+    private final ConsumerTarget_0_10 _sub;
+    private final MessageInstance _entry;
     private final ServerSession _session;
     private boolean _restoreCredit;
 
-    public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+    public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
     {
         super();
         _sub = sub;
@@ -44,9 +44,9 @@ public class MessageAcceptCompletionList
     {
         if(_restoreCredit)
         {
-            _sub.restoreCredit(_entry);
+            _sub.restoreCredit(_entry.getMessage());
         }
-        if(_entry.isAcquiredBy(_sub))
+        if(_entry.isAcquiredBy(_sub.getConsumer()))
         {
             _session.acknowledge(_sub, _entry);
         }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Fri Feb  7 16:57:49 2014
@@ -141,7 +141,7 @@ public class MessageMetaData_0_10 implem
         return buf;
     }
 
-    public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+    public int writeToBuffer(ByteBuffer dest)
     {
         ByteBuffer buf = _encoded;
 
@@ -153,7 +153,7 @@ public class MessageMetaData_0_10 implem
 
         buf = buf.duplicate();
 
-        buf.position(offsetInMetaData);
+        buf.position(0);
 
         if(dest.remaining() < buf.limit())
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Feb  7 16:57:49 2014
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate ex
     private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
     {
         final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
-        final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
-        for (Subscription_0_10 subscription_0_10 : subs)
+        final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subs)
         {
             subscription_0_10.stop();
         }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Feb  7 16:57:49 2014
@@ -46,7 +46,7 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -55,15 +55,16 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.DistributedTransaction;
@@ -77,6 +78,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.txn.SuspendAndFailDtxException;
 import org.apache.qpid.server.txn.TimeoutDtxException;
 import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 import org.slf4j.Logger;
@@ -104,14 +106,7 @@ public class ServerSession extends Sessi
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private ChannelLogSubject _logSubject;
     private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
-    private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
-    {
-        @Override
-        public void onEnqueue(final QueueEntry entry)
-        {
-            entry.getQueue().checkCapacity(ServerSession.this);
-        }
-    };
+    private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
 
     public static interface MessageDispositionChangeListener
     {
@@ -126,12 +121,6 @@ public class ServerSession extends Sessi
 
     }
 
-    public static interface Task
-    {
-        public void doTask(ServerSession session);
-    }
-
-
     private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
             new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
 
@@ -142,9 +131,9 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
 
-    private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
+    private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
 
-    private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+    private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
 
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
 
@@ -194,7 +183,7 @@ public class ServerSession extends Sessi
 
     public int enqueue(final MessageTransferMessage message,
                        final InstanceProperties instanceProperties,
-                       final Exchange exchange)
+                       final MessageDestination exchange)
     {
         if(_outstandingCredit.get() != UNLIMITED_CREDIT
                 && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -386,9 +375,9 @@ public class ServerSession extends Sessi
         }
         _messageDispositionListenerMap.clear();
 
-        for (Task task : _taskList)
+        for (Action<ServerSession> task : _taskList)
         {
-            task.doTask(this);
+            task.performAction(this);
         }
 
         LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
@@ -405,9 +394,9 @@ public class ServerSession extends Sessi
         // Broker shouldn't block awaiting close - thus do override this method to do nothing
     }
 
-    public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+    public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
     {
-        _transaction.dequeue(entry.getQueue(), entry.getMessage(),
+        _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
                              new ServerTransaction.Action()
                              {
 
@@ -426,42 +415,26 @@ public class ServerSession extends Sessi
                              });
     }
 
-    public Collection<Subscription_0_10> getSubscriptions()
+    public Collection<ConsumerTarget_0_10> getSubscriptions()
     {
         return _subscriptions.values();
     }
 
-    public void register(String destination, Subscription_0_10 sub)
+    public void register(String destination, ConsumerTarget_0_10 sub)
     {
         _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
     }
 
-    public Subscription_0_10 getSubscription(String destination)
+    public ConsumerTarget_0_10 getSubscription(String destination)
     {
         return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
     }
 
-    public void unregister(Subscription_0_10 sub)
+    public void unregister(ConsumerTarget_0_10 sub)
     {
         _subscriptions.remove(sub.getName());
-        try
-        {
-            sub.getSendLock();
-            AMQQueue queue = sub.getQueue();
-            if(queue != null)
-            {
-                queue.unregisterSubscription(sub);
-            }
-        }
-        catch (AMQException e)
-        {
-            // TODO
-            _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
-        }
-        finally
-        {
-            sub.releaseSendLock();
-        }
+        sub.close();
+
     }
 
     public boolean isTransactional()
@@ -638,12 +611,12 @@ public class ServerSession extends Sessi
         return getConnection().getAuthorizedSubject();
     }
 
-    public void addSessionCloseTask(Task task)
+    public void addSessionCloseTask(Action<ServerSession> task)
     {
         _taskList.add(task);
     }
 
-    public void removeSessionCloseTask(Task task)
+    public void removeSessionCloseTask(Action<ServerSession> task)
     {
         _taskList.remove(task);
     }
@@ -829,8 +802,8 @@ public class ServerSession extends Sessi
 
     void unregisterSubscriptions()
     {
-        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
-        for (Subscription_0_10 subscription_0_10 : subscriptions)
+        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
         {
             unregister(subscription_0_10);
         }
@@ -838,8 +811,8 @@ public class ServerSession extends Sessi
 
     void stopSubscriptions()
     {
-        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
-        for (Subscription_0_10 subscription_0_10 : subscriptions)
+        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
         {
             subscription_0_10.stop();
         }
@@ -848,8 +821,8 @@ public class ServerSession extends Sessi
 
     public void receivedComplete()
     {
-        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
-        for (Subscription_0_10 subscription_0_10 : subscriptions)
+        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
         {
             subscription_0_10.flushCreditState(false);
         }
@@ -955,4 +928,16 @@ public class ServerSession extends Sessi
         return getId().compareTo(o.getId());
     }
 
+    private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+    {
+        @Override
+        public void performAction(final MessageInstance<C> entry)
+        {
+            TransactionLogResource queue = entry.getOwningResource();
+            if(queue instanceof CapacityChecker)
+            {
+                ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+            }
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Feb  7 16:57:49 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.UUID;
 import org.apache.log4j.Logger;
@@ -34,7 +35,9 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
@@ -45,6 +48,7 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.DtxNotSelectedException;
 import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.txn.SuspendAndFailDtxException;
 import org.apache.qpid.server.txn.TimeoutDtxException;
 import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.ExchangeExistsException;
 import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
 import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -193,7 +198,7 @@ public class ServerSessionDelegate exten
                 String queueName = method.getQueue();
                 VirtualHost vhost = getVirtualHost(session);
 
-                final AMQQueue queue = vhost.getQueue(queueName);
+                final MessageSource queue = vhost.getMessageSource(queueName);
 
                 if(queue == null)
                 {
@@ -214,9 +219,9 @@ public class ServerSessionDelegate exten
                         ServerSession s = (ServerSession) session;
                         queue.setExclusiveOwningSession(s);
 
-                        ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+                        ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
                         {
-                            public void doTask(ServerSession session)
+                            public void performAction(ServerSession session)
                             {
                                 if(queue.getExclusiveOwningSession() == session)
                                 {
@@ -228,9 +233,9 @@ public class ServerSessionDelegate exten
                         if(queue.getAuthorizationHolder() == null)
                         {
                             queue.setAuthorizationHolder(s);
-                            ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+                            ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
                             {
-                                public void doTask(ServerSession session)
+                                public void performAction(ServerSession session)
                                 {
                                     if(queue.getAuthorizationHolder() == session)
                                     {
@@ -254,25 +259,42 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
-                    Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
-                                                                  destination,
-                                                                  method.getAcceptMode(),
-                                                                  method.getAcquireMode(),
-                                                                  MessageFlowMode.WINDOW,
-                                                                  creditManager,
-                                                                  filterManager,
-                                                                  method.getArguments());
+                    ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
+                                                                                 method.getAcceptMode(),
+                                                                                 method.getAcquireMode(),
+                                                                                 MessageFlowMode.WINDOW,
+                                                                                 creditManager,
+                                                                                 method.getArguments()
+                    );
 
-                    ((ServerSession)session).register(destination, sub);
+                    ((ServerSession)session).register(destination, target);
                     try
                     {
-                        queue.registerSubscription(sub, method.getExclusive());
+                        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+                        if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
+                        {
+                            options.add(Consumer.Option.ACQUIRES);
+                        }
+                        if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                        {
+                            options.add(Consumer.Option.SEES_REQUEUES);
+                        }
+                        if(method.getExclusive())
+                        {
+                            options.add(Consumer.Option.EXCLUSIVE);
+                        }
+                        Consumer sub =
+                                queue.addConsumer(target,
+                                                  filterManager,
+                                                  MessageTransferMessage.class,
+                                                  destination,
+                                                  options);
                     }
-                    catch (AMQQueue.ExistingExclusiveSubscription existing)
+                    catch (AMQQueue.ExistingExclusiveConsumer existing)
                     {
                         exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
                     }
-                    catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+                    catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive)
                     {
                         exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
                     }
@@ -288,7 +310,7 @@ public class ServerSessionDelegate exten
     @Override
     public void messageTransfer(Session ssn, final MessageTransfer xfr)
     {
-        final Exchange exchange = getExchangeForMessage(ssn, xfr);
+        final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
 
         final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
         if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -307,7 +329,6 @@ public class ServerSessionDelegate exten
             return;
         }
 
-        final Exchange exchangeInUse;
         final MessageStore store = getVirtualHost(ssn).getMessageStore();
         final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
         final ServerSession serverSession = (ServerSession) ssn;
@@ -385,7 +406,7 @@ public class ServerSessionDelegate exten
     {
         String destination = method.getDestination();
 
-        Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -393,12 +414,7 @@ public class ServerSessionDelegate exten
         }
         else
         {
-            AMQQueue queue = sub.getQueue();
             ((ServerSession)session).unregister(sub);
-            if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
-            {
-                queue.setAuthorizationHolder(null);
-            }
         }
     }
 
@@ -407,7 +423,7 @@ public class ServerSessionDelegate exten
     {
         String destination = method.getDestination();
 
-        Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -814,24 +830,24 @@ public class ServerSessionDelegate exten
         return getVirtualHost(session).getExchange(exchangeName);
     }
 
-    private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+    private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
     {
         VirtualHost virtualHost = getVirtualHost(ssn);
 
-        Exchange exchange;
+        MessageDestination destination;
         if(xfr.hasDestination())
         {
-            exchange = virtualHost.getExchange(xfr.getDestination());
-            if(exchange == null)
+            destination = virtualHost.getMessageDestination(xfr.getDestination());
+            if(destination == null)
             {
-                exchange = virtualHost.getDefaultExchange();
+                destination = virtualHost.getDefaultExchange();
             }
         }
         else
         {
-            exchange = virtualHost.getDefaultExchange();
+            destination = virtualHost.getDefaultExchange();
         }
-        return exchange;
+        return destination;
     }
 
     private VirtualHost getVirtualHost(Session session)
@@ -1249,9 +1265,9 @@ public class ServerSessionDelegate exten
                 if (autoDelete && exclusive)
                 {
                     final AMQQueue q = queue;
-                    final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+                    final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
                         {
-                            public void doTask(ServerSession session)
+                            public void performAction(ServerSession session)
                             {
                                 try
                                 {
@@ -1265,9 +1281,9 @@ public class ServerSessionDelegate exten
                         };
                     final ServerSession s = (ServerSession) session;
                     s.addSessionCloseTask(deleteQueueTask);
-                    queue.addQueueDeleteTask(new AMQQueue.Task()
+                    queue.addQueueDeleteTask(new Action<AMQQueue>()
                         {
-                            public void doTask(AMQQueue queue) throws AMQException
+                            public void performAction(AMQQueue queue)
                             {
                                 s.removeSessionCloseTask(deleteQueueTask);
                             }
@@ -1276,9 +1292,9 @@ public class ServerSessionDelegate exten
                 if (exclusive)
                 {
                     final AMQQueue q = queue;
-                    final ServerSession.Task removeExclusive = new ServerSession.Task()
+                    final Action<ServerSession> removeExclusive = new Action<ServerSession>()
                     {
-                        public void doTask(ServerSession session)
+                        public void performAction(ServerSession session)
                         {
                             q.setAuthorizationHolder(null);
                             q.setExclusiveOwningSession(null);
@@ -1287,9 +1303,9 @@ public class ServerSessionDelegate exten
                     final ServerSession s = (ServerSession) session;
                     q.setExclusiveOwningSession(s);
                     s.addSessionCloseTask(removeExclusive);
-                    queue.addQueueDeleteTask(new AMQQueue.Task()
+                    queue.addQueueDeleteTask(new Action<AMQQueue>()
                     {
-                        public void doTask(AMQQueue queue) throws AMQException
+                        public void performAction(AMQQueue queue)
                         {
                             s.removeSessionCloseTask(removeExclusive);
                         }
@@ -1461,7 +1477,7 @@ public class ServerSessionDelegate exten
     {
         String destination = sfm.getDestination();
 
-        Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -1478,7 +1494,7 @@ public class ServerSessionDelegate exten
     {
         String destination = stop.getDestination();
 
-        Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -1496,7 +1512,7 @@ public class ServerSessionDelegate exten
     {
         String destination = flow.getDestination();
 
-        Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Feb  7 16:57:49 2014
@@ -21,19 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,7 @@ import org.apache.qpid.server.Transactio
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogActor;
@@ -66,25 +56,28 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
 
@@ -122,7 +115,7 @@ public class AMQChannel implements AMQSe
     private IncomingMessage _currentMessage;
 
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
-    private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+    private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
 
     private final MessageStore _messageStore;
 
@@ -155,7 +148,7 @@ public class AMQChannel implements AMQSe
     private volatile boolean _rollingBack;
 
     private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
-    private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+    private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
     private static final
     AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
     private long _createTime = System.currentTimeMillis();
@@ -266,7 +259,7 @@ public class AMQChannel implements AMQSe
         return _channelId;
     }
 
-    public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+    public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
     {
         String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
         SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -275,7 +268,7 @@ public class AMQChannel implements AMQSe
             throw new AMQSecurityException("Permission denied: " + e.getName());
         }
         _currentMessage = new IncomingMessage(info);
-        _currentMessage.setExchange(e);
+        _currentMessage.setMessageDestination(e);
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -360,7 +353,7 @@ public class AMQChannel implements AMQSe
                                     }
                                 };
 
-                        int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+                        int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
                                                                           immediate ? _immediateAction : _capacityCheckAction);
                         if(enqueues == 0)
                         {
@@ -497,62 +490,89 @@ public class AMQChannel implements AMQSe
     }
 
 
-    public Subscription getSubscription(AMQShortString subscription)
+    public Consumer getSubscription(AMQShortString tag)
     {
-        return _tag2SubscriptionMap.get(subscription);
+        final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+        return target == null ? null : target.getConsumer();
     }
 
     /**
      * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
      * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
      *
+     *
      * @param tag       the tag chosen by the client (if null, server will generate one)
-     * @param queue     the queue to subscribe to
+     * @param source     the queue to subscribe to
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
      *
-     * @param noLocal   Flag stopping own messages being received.
      * @param exclusive Flag requesting exclusive access to the queue
      * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
      *
      * @throws AMQException                  if something goes wrong
      */
-    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
-                                           FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+    public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+                                            FieldTable filters, boolean exclusive) throws AMQException
     {
         if (tag == null)
         {
             tag = new AMQShortString("sgen_" + getNextConsumerTag());
         }
 
-        if (_tag2SubscriptionMap.containsKey(tag))
+        if (_tag2SubscriptionTargetMap.containsKey(tag))
         {
             throw new AMQException("Consumer already exists with same tag: " + tag);
         }
 
-         Subscription subscription =
-                SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+        ConsumerTarget_0_8 target;
+        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+
+        if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+        {
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+        }
+        else if(acks)
+        {
+            target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
+        }
+        else
+        {
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
+        }
 
+        if(exclusive)
+        {
+            options.add(Consumer.Option.EXCLUSIVE);
+        }
 
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
         // We add before we register as the Async Delivery process may AutoClose the subscriber
         // so calling _cT2QM.remove before we have done put which was after the register succeeded.
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
 
-        _tag2SubscriptionMap.put(tag, subscription);
+        _tag2SubscriptionTargetMap.put(tag, target);
 
         try
         {
-            queue.registerSubscription(subscription, exclusive);
+            Consumer sub =
+                    source.addConsumer(target,
+                                      FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+                                      AMQMessage.class,
+                                      AMQShortString.toString(tag),
+                                      options);
         }
         catch (AMQException e)
         {
-            _tag2SubscriptionMap.remove(tag);
+            _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
         catch (RuntimeException e)
         {
-            _tag2SubscriptionMap.remove(tag);
+            _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
         return tag;
@@ -567,18 +587,11 @@ public class AMQChannel implements AMQSe
     public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
     {
 
-        Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+        ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+        Consumer sub = target == null ? null : target.getConsumer();
         if (sub != null)
         {
-            try
-            {
-                sub.getSendLock();
-                sub.getQueue().unregisterSubscription(sub);
-            }
-            finally
-            {
-                sub.releaseSendLock();
-            }
+            sub.close();
             return true;
         }
         else
@@ -633,7 +646,7 @@ public class AMQChannel implements AMQSe
     {
         if (_logger.isInfoEnabled())
         {
-            if (!_tag2SubscriptionMap.isEmpty())
+            if (!_tag2SubscriptionTargetMap.isEmpty())
             {
                 _logger.info("Unsubscribing all consumers on channel " + toString());
             }
@@ -643,28 +656,21 @@ public class AMQChannel implements AMQSe
             }
         }
 
-        for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+        for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
         {
             if (_logger.isInfoEnabled())
             {
                 _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
-            Subscription sub = me.getValue();
+            Consumer sub = me.getValue().getConsumer();
 
-            try
-            {
-                sub.getSendLock();
-                sub.getQueue().unregisterSubscription(sub);
-            }
-            finally
-            {
-                sub.releaseSendLock();
-            }
+
+            sub.close();
 
         }
 
-        _tag2SubscriptionMap.clear();
+        _tag2SubscriptionTargetMap.clear();
     }
 
     /**
@@ -673,24 +679,15 @@ public class AMQChannel implements AMQSe
      * @param entry       the record of the message on the queue that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
-     * @param subscription The consumer that is to acknowledge this message.
+     * @param consumer The consumer that is to acknowledge this message.
      */
-    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
+    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
     {
         if (_logger.isDebugEnabled())
         {
-            if (entry.getQueue() == null)
-            {
-                _logger.debug("Adding unacked message with a null queue:" + entry);
-            }
-            else
-            {
-                if (_logger.isDebugEnabled())
-                {
                     _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
-                               + ") with a queue(" + entry.getQueue() + ") for " + subscription);
-                }
-            }
+                               + ") for " + consumer + " on " + entry.getOwningResource().getName());
+
         }
 
         _unacknowledgedMessageMap.add(deliveryTag, entry);
@@ -713,7 +710,7 @@ public class AMQChannel implements AMQSe
     public void requeue() throws AMQException
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
-        Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+        Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
         if (!messagesToBeDelivered.isEmpty())
         {
@@ -724,21 +721,13 @@ public class AMQChannel implements AMQSe
 
         }
 
-        for (QueueEntry unacked : messagesToBeDelivered)
+        for (MessageInstance unacked : messagesToBeDelivered)
         {
-            if (!unacked.isQueueDeleted())
-            {
-                // Mark message redelivered
-                unacked.setRedelivered();
-
-                // Ensure message is released for redelivery
-                unacked.release();
+            // Mark message redelivered
+            unacked.setRedelivered();
 
-            }
-            else
-            {
-                unacked.delete();
-            }
+            // Ensure message is released for redelivery
+            unacked.release();
         }
 
     }
@@ -752,7 +741,7 @@ public class AMQChannel implements AMQSe
      */
     public void requeue(long deliveryTag) throws AMQException
     {
-        QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+        MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
         if (unacked != null)
         {
@@ -760,20 +749,8 @@ public class AMQChannel implements AMQSe
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            if (!unacked.isQueueDeleted())
-            {
-
-                // Ensure message is released for redelivery
-                unacked.release();
-
-            }
-            else
-            {
-                _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
-                          + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+            unacked.release();
 
-                unacked.delete();
-            }
         }
         else
         {
@@ -786,10 +763,10 @@ public class AMQChannel implements AMQSe
 
     public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
     {
-        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
         if (queueEntry != null)
         {
-            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
             return maximumDeliveryCount > 0;
         }
 
@@ -798,10 +775,10 @@ public class AMQChannel implements AMQSe
 
     public boolean isDeliveredTooManyTimes(final long deliveryTag)
     {
-        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
         if (queueEntry != null)
         {
-            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
             final int numDeliveries = queueEntry.getDeliveryCount();
             return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
         }
@@ -812,16 +789,14 @@ public class AMQChannel implements AMQSe
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
-     * @param requeue Are the messages to be requeued or dropped.
-     *
      * @throws AMQException When something goes wrong.
      */
-    public void resend(final boolean requeue) throws AMQException
+    public void resend() throws AMQException
     {
 
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         if (_logger.isDebugEnabled())
         {
@@ -833,9 +808,8 @@ public class AMQChannel implements AMQSe
         // and those that don't to be requeued.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
                                                                     msgToRequeue,
-                                                                    msgToResend,
-                                                                    requeue,
-                                                                    _messageStore));
+                                                                    msgToResend
+        ));
 
 
         // Process Messages to Resend
@@ -851,39 +825,20 @@ public class AMQChannel implements AMQSe
             }
         }
 
-        for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
+        for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
         {
-            QueueEntry message = entry.getValue();
+            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
             message.decrementDeliveryCount();
 
-            AMQQueue queue = message.getQueue();
-
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
             message.setRedelivered();
 
-            Subscription sub = message.getDeliveredSubscription();
-
-            if (sub != null)
-            {
-
-                if(!queue.resend(message,sub))
-                {
-                    msgToRequeue.put(deliveryTag, message);
-                }
-            }
-            else
+            if (!message.resend())
             {
-
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
-                              + ")to prevent loss");
-                }
-                // move this message to requeue
                 msgToRequeue.put(deliveryTag, message);
             }
         } // for all messages
@@ -898,9 +853,9 @@ public class AMQChannel implements AMQSe
         }
 
         // Process Messages to Requeue at the front of the queue
-        for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
+        for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
         {
-            QueueEntry message = entry.getValue();
+            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
@@ -926,11 +881,11 @@ public class AMQChannel implements AMQSe
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
-        Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+        Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
     }
 
-    private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+    private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple)
     {
 
         return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
@@ -976,9 +931,9 @@ public class AMQChannel implements AMQSe
             if (wasSuspended)
             {
                 // may need to deliver queued messages
-                for (Subscription s : _tag2SubscriptionMap.values())
+                for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
-                    s.getQueue().deliverAsync(s);
+                    s.getConsumer().externalStateChange();
                 }
             }
 
@@ -992,15 +947,15 @@ public class AMQChannel implements AMQSe
             if (!wasSuspended)
             {
                 // may need to deliver queued messages
-                for (Subscription s : _tag2SubscriptionMap.values())
+                for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
                     try
                     {
-                        s.getSendLock();
+                        s.getConsumer().getSendLock();
                     }
                     finally
                     {
-                        s.releaseSendLock();
+                        s.getConsumer().releaseSendLock();
                     }
                 }
             }
@@ -1077,10 +1032,10 @@ public class AMQChannel implements AMQSe
         boolean requiresSuspend = _suspended.compareAndSet(false,true);
 
         // ensure all subscriptions have seen the change to the channel state
-        for(Subscription sub : _tag2SubscriptionMap.values())
+        for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
         {
-            sub.getSendLock();
-            sub.releaseSendLock();
+            sub.getConsumer().getSendLock();
+            sub.getConsumer().releaseSendLock();
         }
 
         try
@@ -1098,16 +1053,16 @@ public class AMQChannel implements AMQSe
 
         postRollbackTask.run();
 
-        for(QueueEntry entry : _resendList)
+        for(MessageInstance entry : _resendList)
         {
-            Subscription sub = entry.getDeliveredSubscription();
+            Consumer sub = entry.getDeliveredConsumer();
             if(sub == null || sub.isClosed())
             {
                 entry.release();
             }
             else
             {
-                sub.getQueue().resend(entry, sub);
+                entry.resend();
             }
         }
         _resendList.clear();
@@ -1115,9 +1070,9 @@ public class AMQChannel implements AMQSe
         if(requiresSuspend)
         {
             _suspended.set(false);
-            for(Subscription sub : _tag2SubscriptionMap.values())
+            for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
             {
-                sub.getQueue().deliverAsync(sub);
+                sub.getConsumer().externalStateChange();
             }
 
         }
@@ -1173,7 +1128,7 @@ public class AMQChannel implements AMQSe
     private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
             {
                 addUnacknowledgedMessage(entry, deliveryTag, sub);
             }
@@ -1234,78 +1189,96 @@ public class AMQChannel implements AMQSe
     }
 
 
-    private class ImmediateAction implements BaseQueue.PostEnqueueAction
+    private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
     {
 
         public ImmediateAction()
         {
         }
 
-        public void onEnqueue(QueueEntry entry)
+        public void performAction(MessageInstance<C> entry)
         {
-            AMQQueue queue = entry.getQueue();
+            TransactionLogResource queue = entry.getOwningResource();
 
             if (!entry.getDeliveredToConsumer() && entry.acquire())
             {
 
                 ServerTransaction txn = new LocalTransaction(_messageStore);
-                Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
-                entries.add(entry);
                 final AMQMessage message = (AMQMessage) entry.getMessage();
-                txn.dequeue(queue, entry.getMessage(),
-                            new MessageAcknowledgeAction(entries)
-                            {
-                                @Override
-                                public void postCommit()
+                MessageReference ref = message.newReference();
+                try
+                {
+                    entry.delete();
+                    txn.dequeue(queue, message,
+                                new ServerTransaction.Action()
                                 {
-                                    try
+                                    @Override
+                                    public void postCommit()
                                     {
-                                        final
-                                        ProtocolOutputConverter outputConverter =
-                                                _session.getProtocolOutputConverter();
-
-                                        outputConverter.writeReturn(message.getMessagePublishInfo(),
-                                                                    message.getContentHeaderBody(),
-                                                                    message,
-                                                                    _channelId,
-                                                                    AMQConstant.NO_CONSUMERS.getCode(),
-                                                                    IMMEDIATE_DELIVERY_REPLY_TEXT);
+                                        try
+                                        {
+                                            final
+                                            ProtocolOutputConverter outputConverter =
+                                                    _session.getProtocolOutputConverter();
+
+                                            outputConverter.writeReturn(message.getMessagePublishInfo(),
+                                                                        message.getContentHeaderBody(),
+                                                                        message,
+                                                                        _channelId,
+                                                                        AMQConstant.NO_CONSUMERS.getCode(),
+                                                                        IMMEDIATE_DELIVERY_REPLY_TEXT);
+                                        }
+                                        catch (AMQException e)
+                                        {
+                                            throw new RuntimeException(e);
+                                        }
                                     }
-                                    catch (AMQException e)
+
+                                    @Override
+                                    public void onRollback()
                                     {
-                                        throw new RuntimeException(e);
+
                                     }
-                                    super.postCommit();
                                 }
-                            }
-                           );
-                txn.commit();
+                               );
+                    txn.commit();
+                }
+                finally
+                {
+                    ref.release();
+                }
 
 
             }
             else
             {
-                queue.checkCapacity(AMQChannel.this);
+                if(queue instanceof CapacityChecker)
+                {
+                    ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+                }
             }
 
         }
     }
 
-    private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+    private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
     {
         @Override
-        public void onEnqueue(final QueueEntry entry)
+        public void performAction(final MessageInstance<C> entry)
         {
-            AMQQueue queue = entry.getQueue();
-            queue.checkCapacity(AMQChannel.this);
+            TransactionLogResource queue = entry.getOwningResource();
+            if(queue instanceof CapacityChecker)
+            {
+                ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+            }
         }
     }
 
     private class MessageAcknowledgeAction implements ServerTransaction.Action
     {
-        private final Collection<QueueEntry> _ackedMessages;
+        private final Collection<MessageInstance> _ackedMessages;
 
-        public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+        public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
         {
             _ackedMessages = ackedMessages;
         }
@@ -1314,7 +1287,7 @@ public class AMQChannel implements AMQSe
         {
             try
             {
-                for(QueueEntry entry : _ackedMessages)
+                for(MessageInstance entry : _ackedMessages)
                 {
                     entry.delete();
                 }
@@ -1337,10 +1310,10 @@ public class AMQChannel implements AMQSe
             {
                 try
                 {
-                        for(QueueEntry entry : _ackedMessages)
-                        {
-                            entry.release();
-                        }
+                    for(MessageInstance entry : _ackedMessages)
+                    {
+                        entry.release();
+                    }
                 }
                 finally
                 {
@@ -1505,7 +1478,7 @@ public class AMQChannel implements AMQSe
     public void deadLetter(long deliveryTag) throws AMQException
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
-        final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
+        final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
 
         if (rejectedQueueEntry == null)
         {
@@ -1514,36 +1487,42 @@ public class AMQChannel implements AMQSe
         else
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
+            final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
 
-            int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+            int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
                 {
                     @Override
-                    public void onEnqueue(final QueueEntry requeueEntry)
+                    public void performAction(final MessageInstance requeueEntry)
                     {
                         _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
-                                                                                   requeueEntry.getQueue().getName()));
+                                                                                   requeueEntry.getOwningResource().getName()));
                     }
                 }, null);
 
             if(requeues == 0)
             {
-                final AMQQueue queue = rejectedQueueEntry.getQueue();
-
-                final Exchange altExchange = queue.getAlternateExchange();
 
-                if (altExchange == null)
+                final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+                if(owningResource instanceof AMQQueue)
                 {
-                    _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
-                    _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+                    final AMQQueue queue = (AMQQueue) owningResource;
 
-                }
-                else
-                {
-                    _logger.debug(
-                            "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
-                            + deliveryTag);
-                    _actor.message(_logSubject,
-                                   ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                    final Exchange altExchange = queue.getAlternateExchange();
+
+                    if (altExchange == null)
+                    {
+                        _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+                        _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+
+                    }
+                    else
+                    {
+                        _logger.debug(
+                                "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+                                + deliveryTag);
+                        _actor.message(_logSubject,
+                                       ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                    }
                 }
             }
 
@@ -1604,6 +1583,6 @@ public class AMQChannel implements AMQSe
     @Override
     public int getConsumerCount()
     {
-        return _tag2SubscriptionMap.size();
+        return _tag2SubscriptionTargetMap.size();
     }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Feb  7 16:57:49 2014
@@ -94,8 +94,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
@@ -1669,7 +1668,7 @@ public class AMQProtocolEngine implement
         }
 
         @Override
-        public void deliverToClient(final Subscription sub, final ServerMessage message,
+        public void deliverToClient(final Consumer sub, final ServerMessage message,
                                     final InstanceProperties props, final long deliveryTag)
                 throws AMQException
         {
@@ -1678,7 +1677,7 @@ public class AMQProtocolEngine implement
                                                   props,
                                                   _channelId,
                                                   deliveryTag,
-                                                  ((SubscriptionImpl)sub).getConsumerTag());
+                                                  new AMQShortString(sub.getName()));
         }
 
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java Fri Feb  7 16:57:49 2014
@@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Fri Feb  7 16:57:49 2014
@@ -23,11 +23,8 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.Consumer;
 
 import java.util.Map;
 
@@ -35,34 +32,28 @@ public class ExtractResendAndRequeue imp
 {
     private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
 
-    private final Map<Long, QueueEntry> _msgToRequeue;
-    private final Map<Long, QueueEntry> _msgToResend;
-    private final boolean _requeueIfUnableToResend;
+    private final Map<Long, MessageInstance> _msgToRequeue;
+    private final Map<Long, MessageInstance> _msgToResend;
     private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
-    private final MessageStore _transactionLog;
 
     public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
-                                   Map<Long, QueueEntry> msgToRequeue,
-                                   Map<Long, QueueEntry> msgToResend,
-                                   boolean requeueIfUnableToResend,
-                                   MessageStore txnLog)
+                                   Map<Long, MessageInstance> msgToRequeue,
+                                   Map<Long, MessageInstance> msgToResend)
     {
         _unacknowledgedMessageMap = unacknowledgedMessageMap;
         _msgToRequeue = msgToRequeue;
         _msgToResend = msgToResend;
-        _requeueIfUnableToResend = requeueIfUnableToResend;
-        _transactionLog = txnLog;
     }
 
-    public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+    public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
     {
 
         message.setRedelivered();
-        final Subscription subscription = message.getDeliveredSubscription();
-        if (subscription != null)
+        final Consumer consumer = message.getDeliveredConsumer();
+        if (consumer != null)
         {
             // Consumer exists
-            if (!subscription.isClosed())
+            if (!consumer.isClosed())
             {
                 _msgToResend.put(deliveryTag, message);
             }
@@ -73,58 +64,13 @@ public class ExtractResendAndRequeue imp
         }
         else
         {
-            // Message has no consumer tag, so was "delivered" to a GET
-            // or consumer no longer registered
-            // cannot resend, so re-queue.
-            if (!message.isQueueDeleted())
-            {
-                if (_requeueIfUnableToResend)
-                {
-                    _msgToRequeue.put(deliveryTag, message);
-                }
-                else
-                {
-
-                    dequeueEntry(message);
-                    _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
-                }
-            }
-            else
-            {
-                dequeueEntry(message);
-                _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
-            }
+            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
         }
 
         // false means continue processing
         return false;
     }
 
-
-    private void dequeueEntry(final QueueEntry node)
-    {
-        ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
-        dequeueEntry(node, txn);
-    }
-
-    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
-    {
-        txn.dequeue(node.getQueue(), node.getMessage(),
-                    new ServerTransaction.Action()
-                    {
-
-                        public void postCommit()
-                        {
-                            node.delete();
-                        }
-
-                        public void onRollback()
-                        {
-
-                        }
-                    });
-    }
-
     public void visitComplete()
     {
         _unacknowledgedMessageMap.clear();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Fri Feb  7 16:57:49 2014
@@ -20,15 +20,12 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
 
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
-    private Exchange _exchange;
+    private MessageDestination _messageDestination;
 
     /**
      * Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
         return _messagePublishInfo.getExchange();
     }
 
-    public Exchange getExchange()
+    public MessageDestination getDestination()
     {
-        return _exchange;
+        return _messageDestination;
     }
 
     public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
         return getContentHeader().getBodySize();
     }
 
-    public void setExchange(final Exchange e)
+    public void setMessageDestination(final MessageDestination e)
     {
-        _exchange = e;
+        _messageDestination = e;
     }
 
     public int getBodyCount() throws AMQException

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Feb  7 16:57:49 2014
@@ -105,7 +105,7 @@ public class MessageMetaData implements 
     }
 
 
-    public int writeToBuffer(int offset, ByteBuffer dest)
+    public int writeToBuffer(ByteBuffer dest)
     {
         int oldPosition = dest.position();
         try



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org