You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/07 17:57:52 UTC

svn commit: r1565726 [5/6] - in /qpid/trunk/qpid/java: ./ amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrad...

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Fri Feb  7 16:57:49 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Collection;
@@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMa
          *@param message the message being iterated over @return true to stop iteration, false to continue
          * @throws AMQException
          */
-        boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
+        boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
 
         void visitComplete();
     }
 
     void visit(Visitor visitor) throws AMQException;
 
-    void add(long deliveryTag, QueueEntry message);
+    void add(long deliveryTag, MessageInstance message);
 
-    QueueEntry remove(long deliveryTag);
+    MessageInstance remove(long deliveryTag);
 
-    Collection<QueueEntry> cancelAllMessages();
+    Collection<MessageInstance> cancelAllMessages();
 
     int size();
 
     void clear();
 
-    QueueEntry get(long deliveryTag);
+    MessageInstance get(long deliveryTag);
 
     /**
      * Get the set of delivery tags that are outstanding.
@@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMa
      */
     Set<Long> getDeliveryTags();
 
-    Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+    Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
 
 }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Feb  7 16:57:49 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Collection;
@@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImp
 
     private long _unackedSize;
 
-    private Map<Long, QueueEntry> _map;
+    private Map<Long, MessageInstance> _map;
 
     private long _lastDeliveryTag;
 
@@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImp
     public UnacknowledgedMessageMapImpl(int prefetchLimit)
     {
         _prefetchLimit = prefetchLimit;
-        _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
+        _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
     }
 
-    public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
+    public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
     {
         if (multiple)
         {
@@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImp
         }
         else
         {
-            final QueueEntry entry = get(deliveryTag);
+            final MessageInstance entry = get(deliveryTag);
             if(entry != null)
             {
                 msgs.put(deliveryTag, entry);
@@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImp
 
     }
 
-    public void remove(Map<Long,QueueEntry> msgs)
+    public void remove(Map<Long,MessageInstance> msgs)
     {
         synchronized (_lock)
         {
@@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public QueueEntry remove(long deliveryTag)
+    public MessageInstance remove(long deliveryTag)
     {
         synchronized (_lock)
         {
 
-            QueueEntry message = _map.remove(deliveryTag);
+            MessageInstance message = _map.remove(deliveryTag);
             if(message != null)
             {
                 _unackedSize -= message.getMessage().getSize();
@@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImp
     {
         synchronized (_lock)
         {
-            Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
-            for (Map.Entry<Long, QueueEntry> entry : currentEntries)
+            Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
+            for (Map.Entry<Long, MessageInstance> entry : currentEntries)
             {
                 visitor.callback(entry.getKey().longValue(), entry.getValue());
             }
@@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public void add(long deliveryTag, QueueEntry message)
+    public void add(long deliveryTag, MessageInstance message)
     {
         synchronized (_lock)
         {
@@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public Collection<QueueEntry> cancelAllMessages()
+    public Collection<MessageInstance> cancelAllMessages()
     {
         synchronized (_lock)
         {
-            Collection<QueueEntry> currentEntries = _map.values();
-            _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+            Collection<MessageInstance> currentEntries = _map.values();
+            _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
             _unackedSize = 0l;
             return currentEntries;
         }
@@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public QueueEntry get(long key)
+    public MessageInstance get(long key)
     {
         synchronized (_lock)
         {
@@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+    public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
     {
-        Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+        Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
         collect(deliveryTag, multiple, ackedMessageMap);
         remove(ackedMessageMap);
         return ackedMessageMap.values();
     }
 
-    private void collect(long key, Map<Long, QueueEntry> msgs)
+    private void collect(long key, Map<Long, MessageInstance> msgs)
     {
         synchronized (_lock)
         {
-            for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
+            for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
             {
                 msgs.put(entry.getKey(),entry.getValue());
                 if (entry.getKey() == key)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Fri Feb  7 16:57:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler i
                               " args:" + body.getArguments());
             }
 
-            AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+            MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
 
             if (queue == null)
             {
@@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler i
                     if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
                     {
 
-                        AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
-                                                                              body.getArguments(), body.getNoLocal(), body.getExclusive());
+                        AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+                                                                               queue,
+                                                                               !body.getNoAck(),
+                                                                               body.getArguments(),
+                                                                               body.getExclusive());
                         if (!body.getNowait())
                         {
                             MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
@@ -156,14 +160,14 @@ public class BasicConsumeMethodHandler i
 
 
                 }
-                catch (AMQQueue.ExistingExclusiveSubscription e)
+                catch (AMQQueue.ExistingExclusiveConsumer e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "
                                                    + queue.getName()
                                                    + " as it already has an existing exclusive consumer");
                 }
-                catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+                catch (AMQQueue.ExistingConsumerPreventsExclusive e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Fri Feb  7 16:57:49 2014
@@ -24,27 +24,31 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicGetBody;
 import org.apache.qpid.framing.BasicGetEmptyBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.EnumSet;
+
 public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
 {
     private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -128,7 +132,7 @@ public class BasicGetMethodHandler imple
         {
 
             @Override
-            public void deliverToClient(final Subscription sub, final ServerMessage message, final
+            public void deliverToClient(final Consumer sub, final ServerMessage message, final
                                         InstanceProperties props, final long deliveryTag)
             throws AMQException
             {
@@ -145,25 +149,32 @@ public class BasicGetMethodHandler imple
         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
             {
                 channel.addUnacknowledgedMessage(entry, deliveryTag, null);
             }
         };
 
-        Subscription sub;
+        ConsumerTarget_0_8 target;
+        EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+                                                          Consumer.Option.SEES_REQUEUES);
         if(acks)
         {
-            sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+            target = ConsumerTarget_0_8.createAckTarget(channel,
+                                                        AMQShortString.EMPTY_STRING, null,
+                                                        singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
         else
         {
-            sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+            target = ConsumerTarget_0_8.createNoAckTarget(channel,
+                                                          AMQShortString.EMPTY_STRING, null,
+                                                          singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
 
-        queue.registerSubscription(sub,false);
-        queue.flushSubscription(sub);
-        queue.unregisterSubscription(sub);
+        Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+        sub.flush();
+        sub.close();
         return(!singleMessageCredit.hasCredit());
 
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Fri Feb  7 16:57:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler i
         }
 
         VirtualHost vHost = session.getVirtualHost();
-        Exchange exch = vHost.getExchange(exchangeName.toString());
+        MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
         // if the exchange does not exist we raise a channel exception
         if (exch == null)
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java Fri Feb  7 16:57:49 2014
@@ -56,7 +56,7 @@ public class BasicRecoverMethodHandler i
             throw body.getChannelNotFoundException(channelId);
         }
 
-        channel.resend(body.getRequeue());
+        channel.resend();
 
         // Qpid 0-8 hacks a synchronous -ok onto recover.
         // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java Fri Feb  7 16:57:49 2014
@@ -58,7 +58,7 @@ public class BasicRecoverSyncMethodHandl
             throw body.getChannelNotFoundException(channelId);
         }
         channel.sync();
-        channel.resend(body.getRequeue());
+        channel.resend();
 
         // Qpid 0-8 hacks a synchronous -ok onto recover.
         // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Fri Feb  7 16:57:49 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,7 @@ public class BasicRejectMethodHandler im
 
         long deliveryTag = body.getDeliveryTag();
 
-        QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+        MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
 
         if (message == null)
         {
@@ -73,16 +74,6 @@ public class BasicRejectMethodHandler im
         }
         else
         {
-            if (message.isQueueDeleted())
-            {
-                _logger.warn("Message's Queue has already been purged, dropping message");
-                message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
-                if(message != null)
-                {
-                    message.delete();
-                }
-                return;
-            }
 
             if (message.getMessage() == null)
             {
@@ -98,41 +89,43 @@ public class BasicRejectMethodHandler im
                               " on channel:" + channel.debugIdentity());
             }
 
-            message.reject();
-
             if (body.getRequeue())
             {
-                channel.requeue(deliveryTag);
-
                 //this requeue represents a message rejected from the pre-dispatch queue
                 //therefore we need to amend the delivery counter.
                 message.decrementDeliveryCount();
+
+                channel.requeue(deliveryTag);
             }
             else
             {
-                 final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
-                 _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
-                 if (maxDeliveryCountEnabled)
-                 {
-                     final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
-                     _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
-                     if (deliveredTooManyTimes)
-                     {
-                         channel.deadLetter(body.getDeliveryTag());
-                     }
-                     else
-                     {
-                         //this requeue represents a message rejected because of a recover/rollback that we
-                         //are not ready to DLQ. We rely on the reject command to resend from the unacked map
-                         //and therefore need to increment the delivery counter so we cancel out the effect
-                         //of the AMQChannel#resend() decrement.
-                         message.incrementDeliveryCount();
-                     }
-                 }
-                 else
-                 {
-                     channel.requeue(deliveryTag);
-                 }
+                // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+                // as it would prevent redelivery
+                // message.reject();
+
+                final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+                _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+                if (maxDeliveryCountEnabled)
+                {
+                    final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+                    _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+                    if (deliveredTooManyTimes)
+                    {
+                        channel.deadLetter(body.getDeliveryTag());
+                    }
+                    else
+                    {
+                        //this requeue represents a message rejected because of a recover/rollback that we
+                        //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+                        //and therefore need to increment the delivery counter so we cancel out the effect
+                        //of the AMQChannel#resend() decrement.
+                        message.incrementDeliveryCount();
+                    }
+                }
+                else
+                {
+                    channel.requeue(deliveryTag);
+                }
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Fri Feb  7 16:57:49 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
@@ -134,8 +135,8 @@ public class QueueDeclareHandler impleme
                             }
                         };
                         protocolConnection.addSessionCloseTask(sessionCloseTask);
-                        queue.addQueueDeleteTask(new AMQQueue.Task() {
-                            public void doTask(AMQQueue queue) throws AMQException
+                        queue.addQueueDeleteTask(new Action<AMQQueue>() {
+                            public void performAction(AMQQueue queue)
                             {
                                 protocolConnection.removeSessionCloseTask(sessionCloseTask);
                             }
@@ -245,9 +246,9 @@ public class QueueDeclareHandler impleme
 
             session.addSessionCloseTask(deleteQueueTask);
 
-            queue.addQueueDeleteTask(new AMQQueue.Task()
+            queue.addQueueDeleteTask(new Action<AMQQueue>()
             {
-                public void doTask(AMQQueue queue)
+                public void performAction(AMQQueue queue)
                 {
                     session.removeSessionCloseTask(deleteQueueTask);
                 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java Fri Feb  7 16:57:49 2014
@@ -74,7 +74,7 @@ public class TxRollbackHandler implement
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             // Why, are we not allowed to send messages back to client before the ok method?
-            channel.resend(false);
+            channel.resend();
 
         }
         catch (AMQException e)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Fri Feb  7 16:57:49 2014
@@ -28,11 +28,11 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -40,6 +40,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.test.utils.QpidTestCase;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.Set;
 
 /**
@@ -47,7 +48,8 @@ import java.util.Set;
  */
 public class AckTest extends QpidTestCase
 {
-    private Subscription _subscription;
+    private ConsumerTarget_0_8 _subscriptionTarget;
+    private Consumer _consumer;
 
     private AMQProtocolSession _protocolSession;
 
@@ -86,7 +88,6 @@ public class AckTest extends QpidTestCas
 
     private void publishMessages(int count, boolean persistent) throws AMQException
     {
-        _queue.registerSubscription(_subscription,false);
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -144,7 +145,7 @@ public class AckTest extends QpidTestCas
                     try
                     {
 
-                        _queue.enqueue(message);
+                        _queue.enqueue(message,null);
                     }
                     catch (AMQException e)
                     {
@@ -178,7 +179,13 @@ public class AckTest extends QpidTestCas
      */
     public void testAckChannelAssociationTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -190,8 +197,8 @@ public class AckTest extends QpidTestCas
         {
             assertTrue(deliveryTag == i);
             i++;
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
         }
 
     }
@@ -202,7 +209,16 @@ public class AckTest extends QpidTestCas
     public void testNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false, null, false, new LimitlessCreditManager());
+        _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+                                                                   DEFAULT_CONSUMER_TAG,
+                                                                   null,
+                                                                   new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget,
+                                       null,
+                                       AMQMessage.class,
+                                       DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -218,7 +234,13 @@ public class AckTest extends QpidTestCas
     public void testPersistentNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, false,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+                                                                   DEFAULT_CONSUMER_TAG,
+                                                                   null,
+                                                                   new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
 
@@ -235,7 +257,15 @@ public class AckTest extends QpidTestCas
      */
     public void testSingleAckReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -248,8 +278,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             // 5 is the delivery tag of the message that *should* be removed
             if (++i == 5)
             {
@@ -264,7 +294,15 @@ public class AckTest extends QpidTestCas
      */
     public void testMultiAckReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -279,8 +317,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             ++i;
         }
     }
@@ -290,7 +328,15 @@ public class AckTest extends QpidTestCas
      */
     public void testMultiAckAllReceivedTest() throws AMQException
     {
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true,null,false, new LimitlessCreditManager());
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 10;
         publishMessages(msgCount);
 
@@ -303,8 +349,8 @@ public class AckTest extends QpidTestCas
         for (long deliveryTag : deliveryTagSet)
         {
             assertTrue(deliveryTag == i + 5);
-            QueueEntry unackedMsg = map.get(deliveryTag);
-            assertTrue(unackedMsg.getQueue() == _queue);
+            MessageInstance unackedMsg = map.get(deliveryTag);
+            assertTrue(unackedMsg.getOwningResource() == _queue);
             ++i;
         }
     }
@@ -319,12 +365,16 @@ public class AckTest extends QpidTestCas
         // Send 10 messages
         Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
 
-        _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
-                                                                            DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
+
         final int msgCount = 1;
         publishMessages(msgCount);
 
-        _queue.deliverAsync(_subscription);
+        _consumer.externalStateChange();
 
         _channel.acknowledgeMessage(1, false);
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Fri Feb  7 16:57:49 2014
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
         assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
 
         //Subscribe to the queue
-        AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+        AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
 
         getQueue().deliverAsync();
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Fri Feb  7 16:57:49 2014
@@ -23,20 +23,22 @@ package org.apache.qpid.server.protocol.
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.SimpleQueueEntryList;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
  *
@@ -59,40 +61,50 @@ public class ExtractResendAndRequeueTest
 
     private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
     private static final int INITIAL_MSG_COUNT = 10;
-    private AMQQueue _queue = new MockAMQQueue(getName());
-    private MessageStore _messageStore = new TestMemoryMessageStore();
-    private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+    private AMQQueue _queue;
+    private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
+    private Consumer _consumer;
+    private boolean _queueDeleted;
 
     @Override
     public void setUp() throws AMQException
     {
+        _queueDeleted = false;
         _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+        _queue = mock(AMQQueue.class);
+        when(_queue.getName()).thenReturn(getName());
+        when(_queue.isDeleted()).thenReturn(_queueDeleted);
+        _consumer = mock(Consumer.class);
+        when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+
 
         long id = 0;
-        SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
 
         // Add initial messages to QueueEntryList
         for (int count = 0; count < INITIAL_MSG_COUNT; count++)
         {
-            AMQMessage msg = new MockAMQMessage(id);
-
-            list.add(msg);
+            ServerMessage msg = mock(ServerMessage.class);
+            when(msg.getMessageNumber()).thenReturn(id);
+            final QueueEntry entry = mock(QueueEntry.class);
+            when(entry.getMessage()).thenReturn(msg);
+            when(entry.getQueue()).thenReturn(_queue);
+            when(entry.isQueueDeleted()).thenReturn(_queueDeleted);
+            doAnswer(new Answer()
+            {
+                @Override
+                public Object answer(final InvocationOnMock invocation) throws Throwable
+                {
+                    when(entry.isDeleted()).thenReturn(true);
+                    return null;
+                }
+            }).when(entry).delete();
 
+            _unacknowledgedMessageMap.add(id, entry);
+            _referenceList.add(entry);
             //Increment ID;
             id++;
         }
 
-        // Iterate through the QueueEntryList and add entries to unacknowledgedMessageMap and referenceList
-        QueueEntryIterator queueEntries = list.iterator();
-        while(queueEntries.advance())
-        {
-            QueueEntry entry = queueEntries.getNode();
-            _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
-
-            // Store the entry for future inspection
-            _referenceList.add(entry);
-        }
-
         assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
     }
 
@@ -103,17 +115,14 @@ public class ExtractResendAndRequeueTest
      *
      * @return Subscription that performed the acquire
      */
-    private Subscription createSubscriptionAndAcquireMessages(LinkedList<QueueEntry> messageList)
+    private void acquireMessages(LinkedList<MessageInstance> messageList)
     {
-        Subscription subscription = new MockSubscription();
 
-        // Aquire messages in subscription
-        for (QueueEntry entry : messageList)
+        // Acquire messages in subscription
+        for(MessageInstance entry : messageList)
         {
-            entry.acquire(subscription);
+            when(entry.getDeliveredConsumer()).thenReturn(_consumer);
         }
-
-        return subscription;
     }
 
     /**
@@ -128,14 +137,14 @@ public class ExtractResendAndRequeueTest
     public void testResend() throws AMQException
     {
         //We don't need the subscription object here.
-        createSubscriptionAndAcquireMessages(_referenceList);
+        acquireMessages(_referenceList);
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         // requeueIfUnableToResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
+                                                                    msgToResend));
 
         assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -154,100 +163,22 @@ public class ExtractResendAndRequeueTest
      */
     public void testRequeueDueToSubscriptionClosure() throws AMQException
     {
-        Subscription subscription = createSubscriptionAndAcquireMessages(_referenceList);
+        acquireMessages(_referenceList);
 
         // Close subscription
-        subscription.close();
+        when(_consumer.isClosed()).thenReturn(true);
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         // requeueIfUnableToResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-    }
-
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
-     * requeueIfUnableToResend(set to true) then all messages should be sent to the msgToRequeue map.
-     *
-     * @throws AMQException the visit interface throws this
-     */
-
-    public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        // requeueIfUnableToResend = true so all messages should go to msgToRequeue
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, _messageStore));
+                                                                    msgToResend));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
         assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
     }
 
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, And we request that we don't
-     * requeueIfUnableToResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
-     *
-     * @throws AMQException the visit interface throws this
-     */
-
-    public void testDrop() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        // requeueIfUnableToResend = false so all messages should be dropped all maps should be empty
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-
-        for (QueueEntry entry : _referenceList)
-        {
-            assertTrue("Message was not discarded", entry.isDeleted());
-        }
-
-    }
-
-    /**
-     * If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
-     * delivered has been deleted then it is not possible to requeue. Currently we simply discard the message but in the
-     * future we may wish to dead letter the message.
-     *
-     * Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
-     *
-     * @throws AMQException the visit interface throws this
-     */
-    public void testDiscard() throws AMQException
-    {
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
-
-        _queue.delete();
-
-        // requeueIfUnableToResend : value doesn't matter here as queue has been deleted
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, _messageStore));
-
-        assertEquals("Message count for resend not correct.", 0, msgToResend.size());
-        assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
-
-        for (QueueEntry entry : _referenceList)
-        {
-            assertTrue("Message was not discarded", entry.isDeleted());
-        }
-    }
 
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Feb  7 16:57:49 2014
@@ -47,11 +47,9 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -60,7 +58,7 @@ public class InternalTestProtocolSession
 {
     private static final Logger _logger = Logger.getLogger(InternalTestProtocolSession.class);
     // ChannelID(LIST)  -> LinkedList<Pair>
-    private final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+    private final Map<Integer, Map<String, LinkedList<DeliveryPair>>> _channelDelivers;
     private AtomicInteger _deliveryCount = new AtomicInteger(0);
     private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
 
@@ -68,7 +66,7 @@ public class InternalTestProtocolSession
     {
         super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
 
-        _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+        _channelDelivers = new HashMap<Integer, Map<String, LinkedList<DeliveryPair>>>();
 
         setTestAuthorizedSubject();
         setVirtualHost(virtualHost);
@@ -117,7 +115,7 @@ public class InternalTestProtocolSession
     {
         synchronized (_channelDelivers)
         {
-            List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
+            List<DeliveryPair> all =_channelDelivers.get(channelId).get(AMQShortString.toString(consumerTag));
 
             if (all == null)
             {
@@ -153,23 +151,23 @@ public class InternalTestProtocolSession
 
         synchronized (_channelDelivers)
         {
-            Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+            Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
 
             if (consumers == null)
             {
-                consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+                consumers = new HashMap<String, LinkedList<DeliveryPair>>();
                 _channelDelivers.put(channelId, consumers);
             }
 
-            LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+            LinkedList<DeliveryPair> consumerDelivers = consumers.get(AMQShortString.toString(consumerTag));
 
             if (consumerDelivers == null)
             {
                 consumerDelivers = new LinkedList<DeliveryPair>();
-                consumers.put(consumerTag, consumerDelivers);
+                consumers.put(consumerTag.toString(), consumerDelivers);
             }
 
-            consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)msg));
+            consumerDelivers.add(new DeliveryPair(deliveryTag, msg));
         }
     }
 
@@ -247,27 +245,27 @@ public class InternalTestProtocolSession
 
 
         @Override
-        public void deliverToClient(Subscription sub, ServerMessage message,
+        public void deliverToClient(Consumer sub, ServerMessage message,
                                     InstanceProperties props, long deliveryTag) throws AMQException
         {
             _deliveryCount.incrementAndGet();
 
             synchronized (_channelDelivers)
             {
-                Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
+                Map<String, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(_channelId);
 
                 if (consumers == null)
                 {
-                    consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+                    consumers = new HashMap<String, LinkedList<DeliveryPair>>();
                     _channelDelivers.put(_channelId, consumers);
                 }
 
-                LinkedList<DeliveryPair> consumerDelivers = consumers.get(((SubscriptionImpl)sub).getConsumerTag());
+                LinkedList<DeliveryPair> consumerDelivers = consumers.get(sub.getName());
 
                 if (consumerDelivers == null)
                 {
                     consumerDelivers = new LinkedList<DeliveryPair>();
-                    consumers.put(((SubscriptionImpl)sub).getConsumerTag(), consumerDelivers);
+                    consumers.put(sub.getName(), consumerDelivers);
                 }
 
                 consumerDelivers.add(new DeliveryPair(deliveryTag, message));

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Fri Feb  7 16:57:49 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -130,8 +130,7 @@ public class QueueBrowserUsesNoAckTest e
         //Check the process didn't suspend the subscription as this would
         // indicate we are using the prefetch credit. i.e. using acks not No-Ack
         assertTrue("The subscription has been suspended",
-                   !getChannel().getSubscription(browser).getState()
-                           .equals(Subscription.State.SUSPENDED));
+                   !getChannel().getSubscription(browser).isSuspended());
     }
 
     private void checkStoreContents(int messageCount)
@@ -144,6 +143,6 @@ public class QueueBrowserUsesNoAckTest e
         FieldTable filters = new FieldTable();
         filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
 
-        return channel.subscribeToQueue(null, queue, true, filters, false, true);
+        return channel.consumeFromSource(null, queue, true, filters, true);
     }
 }

Propchange: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
            ('svn:mergeinfo' removed)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Feb  7 16:57:49 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
@@ -53,16 +54,8 @@ public class Connection_1_0 implements C
     private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
     private final Object _reference = new Object();
 
-
-
-    public static interface Task
-    {
-        public void doTask(Connection_1_0 connection);
-    }
-
-
-    private List<Task> _closeTasks =
-            Collections.synchronizedList(new ArrayList<Task>());
+    private List<Action<Connection_1_0>> _closeTasks =
+            Collections.synchronizedList(new ArrayList<Action<Connection_1_0>>());
 
 
 
@@ -98,26 +91,26 @@ public class Connection_1_0 implements C
         _sessions.remove(session);
     }
 
-    void removeConnectionCloseTask(final Task task)
+    void removeConnectionCloseTask(final Action<Connection_1_0> task)
     {
         _closeTasks.remove( task );
     }
 
-    void addConnectionCloseTask(final Task task)
+    void addConnectionCloseTask(final Action<Connection_1_0> task)
     {
         _closeTasks.add( task );
     }
 
     public void closeReceived()
     {
-        List<Task> taskCopy;
+        List<Action<Connection_1_0>> taskCopy;
         synchronized (_closeTasks)
         {
-            taskCopy = new ArrayList<Task>(_closeTasks);
+            taskCopy = new ArrayList<Action<Connection_1_0>>(_closeTasks);
         }
-        for(Task task : taskCopy)
+        for(Action<Connection_1_0> task : taskCopy)
         {
-            task.doTask(this);
+            task.performAction(this);
         }
         synchronized (_closeTasks)
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Feb  7 16:57:49 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import java.io.EOFException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.ListIterator;
@@ -286,7 +285,7 @@ public abstract class MessageConverter_t
         Binary dataEncoding = sectionEncoder.getEncoding();
 
         final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
-        metaData.writeToBuffer(0,allData);
+        metaData.writeToBuffer(allData);
         allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
         return allData;
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Fri Feb  7 16:57:49 2014
@@ -314,7 +314,7 @@ public class MessageMetaData_1_0 impleme
         return buf;
     }
 
-    public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+    public int writeToBuffer(ByteBuffer dest)
     {
         ByteBuffer buf = _encoded;
 
@@ -326,7 +326,7 @@ public class MessageMetaData_1_0 impleme
 
         buf = buf.duplicate();
 
-        buf.position(offsetInMetaData);
+        buf.position(0);
 
         if(dest.remaining() < buf.limit())
         {

Propchange: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
            ('svn:mergeinfo' removed)

Propchange: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
            ('svn:mergeinfo' removed)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Fri Feb  7 16:57:49 2014
@@ -24,22 +24,21 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.amqp_1_0.type.Outcome;
 import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
 
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.queue.AMQQueue;
 
 import org.apache.qpid.server.txn.ServerTransaction;
 
-public class QueueDestination implements SendingDestination, ReceivingDestination
+public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
 {
     private static final Logger _logger = Logger.getLogger(QueueDestination.class);
     private static final Accepted ACCEPTED = new Accepted();
     private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
 
 
-    private AMQQueue _queue;
-
     public QueueDestination(AMQQueue queue)
     {
-        _queue = queue;
+        super(queue);
     }
 
     public Outcome[] getOutcomes()
@@ -52,7 +51,7 @@ public class QueueDestination implements
 
         try
         {
-            txn.enqueue(_queue,message, new ServerTransaction.Action()
+            txn.enqueue(getQueue(),message, new ServerTransaction.Action()
             {
 
 
@@ -60,8 +59,7 @@ public class QueueDestination implements
                 {
                     try
                     {
-
-                        _queue.enqueue(message);
+                        getQueue().enqueue(message,null);
                     }
                     catch (Exception e)
                     {
@@ -93,7 +91,7 @@ public class QueueDestination implements
 
     public AMQQueue getQueue()
     {
-        return _queue;
+        return (AMQQueue) super.getQueue();
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Feb  7 16:57:49 2014
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,11 +65,14 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
@@ -78,18 +82,22 @@ public class SendingLink_1_0 implements 
     private VirtualHost _vhost;
     private SendingDestination _destination;
 
-    private Subscription_1_0 _subscription;
+    private Consumer _consumer;
+    private ConsumerTarget_1_0 _target;
+
     private boolean _draining;
-    private final Map<Binary, QueueEntry> _unsettledMap =
-            new HashMap<Binary, QueueEntry>();
+    private final Map<Binary, MessageInstance> _unsettledMap =
+            new HashMap<Binary, MessageInstance>();
 
     private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
             new ConcurrentHashMap<Binary, UnsettledAction>();
     private volatile SendingLinkAttachment _linkAttachment;
     private TerminusDurability _durability;
-    private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+    private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
     private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
     private Runnable _closeAction;
+    private final MessageSource _queue;
+
 
     public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
                            final VirtualHost vhost,
@@ -103,24 +111,22 @@ public class SendingLink_1_0 implements 
         _durability = source.getDurable();
         linkAttachment.setDeliveryStateHandler(this);
         QueueDestination qd = null;
-        AMQQueue queue = null;
 
+        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
 
 
         boolean noLocal = false;
         JMSSelectorFilter messageFilter = null;
 
-        if(destination instanceof QueueDestination)
+        if(destination instanceof MessageSourceDestination)
         {
-            queue = ((QueueDestination) _destination).getQueue();
+            _queue = ((MessageSourceDestination) _destination).getQueue();
 
-            if(queue.getAvailableAttributes().contains("topic"))
+            if(_queue instanceof AMQQueue && ((AMQQueue)_queue).getAvailableAttributes().contains("topic"))
             {
                 source.setDistributionMode(StdDistMode.COPY);
             }
 
-            qd = (QueueDestination) destination;
-
             Map<Symbol,Filter> filters = source.getFilter();
 
             Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
@@ -167,7 +173,13 @@ public class SendingLink_1_0 implements 
             }
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
 
-            _subscription = new Subscription_1_0(this, qd, source.getDistributionMode() != StdDistMode.COPY);
+            _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+            if(source.getDistributionMode() != StdDistMode.COPY)
+            {
+                options.add(Consumer.Option.ACQUIRES);
+                options.add(Consumer.Option.SEES_REQUEUES);
+            }
+
         }
         else if(destination instanceof ExchangeDestination)
         {
@@ -199,7 +211,7 @@ public class SendingLink_1_0 implements 
                     name = UUID.randomUUID().toString();
                 }
 
-                queue = _vhost.getQueue(name);
+                AMQQueue queue = _vhost.getQueue(name);
                 Exchange exchange = exchangeDestination.getExchange();
 
                 if(queue == null)
@@ -299,9 +311,10 @@ public class SendingLink_1_0 implements 
                         }
                     }
                 }
+                _queue = queue;
                 source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
 
-                exchange.addBinding(binding,queue,null);
+                exchange.addBinding(binding, queue,null);
                 source.setDistributionMode(StdDistMode.COPY);
 
                 if(!isDurable)
@@ -309,10 +322,10 @@ public class SendingLink_1_0 implements 
                     final String queueName = name;
                     final AMQQueue tempQueue = queue;
 
-                    final Connection_1_0.Task deleteQueueTask =
-                                            new Connection_1_0.Task()
+                    final Action<Connection_1_0> deleteQueueTask =
+                                            new Action<Connection_1_0>()
                                             {
-                                                public void doTask(Connection_1_0 session)
+                                                public void performAction(Connection_1_0 session)
                                                 {
                                                     if (_vhost.getQueue(queueName) == tempQueue)
                                                     {
@@ -331,9 +344,9 @@ public class SendingLink_1_0 implements 
 
                                     getSession().getConnection().addConnectionCloseTask(deleteQueueTask);
 
-                                    queue.addQueueDeleteTask(new AMQQueue.Task()
+                                    queue.addQueueDeleteTask(new Action<AMQQueue>()
                                     {
-                                        public void doTask(AMQQueue queue)
+                                        public void performAction(AMQQueue queue)
                                         {
                                             getSession().getConnection().removeConnectionCloseTask(deleteQueueTask);
                                         }
@@ -347,31 +360,46 @@ public class SendingLink_1_0 implements 
             catch (AMQSecurityException e)
             {
                 _logger.error("Security error", e);
+                throw new RuntimeException(e);
             }
             catch (AMQInternalException e)
             {
                 _logger.error("Internal error", e);
+                throw new RuntimeException(e);
             }
             catch (AMQException e)
             {
                 _logger.error("Error", e);
+                throw new RuntimeException(e);
             }
-            _subscription = new Subscription_1_0(this, qd, true);
 
+
+            _target = new ConsumerTarget_1_0(this, true);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
+
+        }
+        else
+        {
+            throw new RuntimeException("Unknown destination type");
         }
 
-        if(_subscription != null)
+        if(_target != null)
         {
-            _subscription.setNoLocal(noLocal);
-            if(messageFilter!=null)
+            if(noLocal)
             {
-                _subscription.setFilters(new SimpleFilterManager(messageFilter));
+                options.add(Consumer.Option.NO_LOCAL);
             }
 
+
+            _consumer.setNoLocal(noLocal);
+
+
             try
             {
-
-                queue.registerSubscription(_subscription, false);
+                _consumer = _queue.addConsumer(_target,
+                                               messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+                                               Message_1_0.class, getEndpoint().getName(), options);
             }
             catch (AMQException e)
             {
@@ -394,12 +422,11 @@ public class SendingLink_1_0 implements 
         // if not durable or close
         if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
         {
-            AMQQueue queue = _subscription.getQueue();
 
             try
             {
 
-                queue.unregisterSubscription(_subscription);
+                _consumer.close();
 
             }
             catch (AMQException e)
@@ -426,7 +453,7 @@ public class SendingLink_1_0 implements 
             {
                 try
                 {
-                    queue.getVirtualHost().removeQueue(queue);
+                    _vhost.removeQueue((AMQQueue)_queue);
                 }
                 catch(AMQException e)
                 {
@@ -443,7 +470,7 @@ public class SendingLink_1_0 implements 
         else if(detach == null || detach.getError() != null)
         {
             _linkAttachment = null;
-            _subscription.flowStateChanged();
+            _target.flowStateChanged();
         }
         else
         {
@@ -491,7 +518,7 @@ public class SendingLink_1_0 implements 
         }
         if(_resumeAcceptedTransfers.isEmpty())
         {
-            _subscription.flowStateChanged();
+            _target.flowStateChanged();
         }
 
     }
@@ -531,7 +558,7 @@ public class SendingLink_1_0 implements 
         }
     }
 
-    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
     {
         _unsettledActionMap.put(tag,unsettledAction);
         if(getTransactionId() == null)
@@ -593,9 +620,9 @@ public class SendingLink_1_0 implements 
     public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
     {
 
-        if(_subscription.isActive())
+        if(_consumer.isActive())
         {
-            _subscription.suspend();
+            _target.suspend();
         }
 
         _linkAttachment = linkAttachment;
@@ -603,14 +630,14 @@ public class SendingLink_1_0 implements 
         SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
         endpoint.setDeliveryStateHandler(this);
         Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
-        Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+        Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
         _resumeAcceptedTransfers.clear();
         _resumeFullTransfers.clear();
 
-        for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+        for(Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
         {
             Binary deliveryTag = entry.getKey();
-            final QueueEntry queueEntry = entry.getValue();
+            final MessageInstance queueEntry = entry.getValue();
             if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
             {
                 queueEntry.setRedelivered();
@@ -624,7 +651,7 @@ public class SendingLink_1_0 implements 
                 if(outcome instanceof Accepted)
                 {
                     AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
-                    if(_subscription.acquires())
+                    if(_consumer.acquires())
                     {
                         txn.dequeue(Collections.singleton(queueEntry),
                                 new ServerTransaction.Action()
@@ -644,7 +671,7 @@ public class SendingLink_1_0 implements 
                 else if(outcome instanceof Released)
                 {
                     AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
-                    if(_subscription.acquires())
+                    if(_consumer.acquires())
                     {
                         txn.dequeue(Collections.singleton(queueEntry),
                                 new ServerTransaction.Action()
@@ -678,9 +705,9 @@ public class SendingLink_1_0 implements 
 
     public Map getUnsettledOutcomeMap()
     {
-        Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+        Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
 
-        for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+        for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
         {
             entry.setValue(null);
         }
@@ -692,4 +719,9 @@ public class SendingLink_1_0 implements 
     {
         _closeAction = action;
     }
+
+    public VirtualHost getVirtualHost()
+    {
+        return _vhost;
+    }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Feb  7 16:57:49 2014
@@ -41,6 +41,8 @@ import org.apache.qpid.AMQSecurityExcept
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -48,6 +50,7 @@ import org.apache.qpid.server.protocol.L
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.*;
@@ -108,11 +111,11 @@ public class Session_1_0 implements Sess
                         source.setAddress(tempQueue.getName());
                     }
                     String addr = source.getAddress();
-                    AMQQueue queue = _vhost.getQueue(addr);
+                    MessageSource queue = _vhost.getMessageSource(addr);
                     if(queue != null)
                     {
 
-                        destination = new QueueDestination(queue);
+                        destination = new MessageSourceDestination(queue);
 
 
 
@@ -249,11 +252,11 @@ public class Session_1_0 implements Sess
                         }
 
                         String addr = target.getAddress();
-                        Exchange exchg = _vhost.getExchange(addr);
-                        if(exchg != null)
+                        MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+                        if(messageDestination != null)
                         {
-                            destination = new ExchangeDestination(exchg, target.getDurable(),
-                                                                  target.getExpiryPolicy());
+                            destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
+                                                                       target.getExpiryPolicy());
                         }
                         else
                         {
@@ -343,10 +346,10 @@ public class Session_1_0 implements Sess
 
             if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
             {
-                final Connection_1_0.Task deleteQueueTask =
-                        new Connection_1_0.Task()
+                final Action<Connection_1_0> deleteQueueTask =
+                        new Action<Connection_1_0>()
                         {
-                            public void doTask(Connection_1_0 session)
+                            public void performAction(Connection_1_0 session)
                             {
                                 if (_vhost.getQueue(queueName) == tempQueue)
                                 {
@@ -365,9 +368,9 @@ public class Session_1_0 implements Sess
 
                 _connection.addConnectionCloseTask(deleteQueueTask);
 
-                queue.addQueueDeleteTask(new AMQQueue.Task()
+                queue.addQueueDeleteTask(new Action<AMQQueue>()
                 {
-                    public void doTask(AMQQueue queue)
+                    public void performAction(AMQQueue queue)
                     {
                         _connection.removeConnectionCloseTask(deleteQueueTask);
                     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org