You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 11:29:57 UTC

svn commit: r1564703 [4/4] - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/logging/actors/ broker-core/src/main/java/org/apache/qpid/s...

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java Wed Feb  5 10:29:55 2014
@@ -23,10 +23,10 @@ package org.apache.qpid.server.protocol.
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 
 public interface ClientDeliveryMethod
 {
-    void deliverToClient(final Subscription sub, final ServerMessage message, final InstanceProperties props,
+    void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props,
                          final long deliveryTag) throws AMQException;
 }

Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Wed Feb  5 10:29:55 2014
@@ -33,8 +33,8 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.Atomi
  * Encapsulation of a subscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
  * that was given out by the broker and the channel id. <p/>
  */
-public abstract class SubscriptionTarget_0_8 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
 {
 
     private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
@@ -70,23 +70,23 @@ public abstract class SubscriptionTarget
 
     private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
-    private Subscription _subscription;
+    private Consumer _consumer;
 
 
-    public static SubscriptionTarget_0_8 createBrowserTarget(AMQChannel channel,
+    public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
                                                              AMQShortString consumerTag, FieldTable filters,
                                                              FlowCreditManager creditManager) throws AMQException
     {
-        return new BrowserSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+        return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
 
-    static final class BrowserSubscription extends SubscriptionTarget_0_8
+    static final class BrowserConsumer extends ConsumerTarget_0_8
     {
-        public BrowserSubscription(AMQChannel channel,
-                                   AMQShortString consumerTag, FieldTable filters,
-                                   FlowCreditManager creditManager,
-                                   ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
+        public BrowserConsumer(AMQChannel channel,
+                               AMQShortString consumerTag, FieldTable filters,
+                               FlowCreditManager creditManager,
+                               ClientDeliveryMethod deliveryMethod,
+                               RecordDeliveryMethod recordMethod)
             throws AMQException
         {
             super(channel, consumerTag,
@@ -124,31 +124,31 @@ public abstract class SubscriptionTarget
 
     }
 
-    public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+    public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
                                                            AMQShortString consumerTag, FieldTable filters,
                                                            FlowCreditManager creditManager) throws AMQException
     {
-        return new NoAckSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+        return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
 
-    public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+    public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
                                                            AMQShortString consumerTag, FieldTable filters,
                                                            FlowCreditManager creditManager,
                                                            ClientDeliveryMethod deliveryMethod,
                                                            RecordDeliveryMethod recordMethod) throws AMQException
     {
-        return new NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+        return new NoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
     }
 
-    public static class NoAckSubscription extends SubscriptionTarget_0_8
+    public static class NoAckConsumer extends ConsumerTarget_0_8
     {
         private final AutoCommitTransaction _txn;
 
-        public NoAckSubscription(AMQChannel channel,
-                                 AMQShortString consumerTag, FieldTable filters,
-                                 FlowCreditManager creditManager,
-                                 ClientDeliveryMethod deliveryMethod,
-                                 RecordDeliveryMethod recordMethod)
+        public NoAckConsumer(AMQChannel channel,
+                             AMQShortString consumerTag, FieldTable filters,
+                             FlowCreditManager creditManager,
+                             ClientDeliveryMethod deliveryMethod,
+                             RecordDeliveryMethod recordMethod)
             throws AMQException
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -221,13 +221,13 @@ public abstract class SubscriptionTarget
     /**
      * NoAck Subscription for use with BasicGet method.
      */
-    public static final class GetNoAckSubscription extends SubscriptionTarget_0_8.NoAckSubscription
+    public static final class GetNoAckConsumer extends NoAckConsumer
     {
-        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
-                                    AMQShortString consumerTag, FieldTable filters,
-                                    boolean noLocal, FlowCreditManager creditManager,
-                                    ClientDeliveryMethod deliveryMethod,
-                                    RecordDeliveryMethod recordMethod)
+        public GetNoAckConsumer(AMQChannel channel, AMQProtocolSession protocolSession,
+                                AMQShortString consumerTag, FieldTable filters,
+                                boolean noLocal, FlowCreditManager creditManager,
+                                ClientDeliveryMethod deliveryMethod,
+                                RecordDeliveryMethod recordMethod)
             throws AMQException
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -241,32 +241,32 @@ public abstract class SubscriptionTarget
     }
 
 
-    public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+    public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
                                                          AMQShortString consumerTag, FieldTable filters,
                                                          FlowCreditManager creditManager)
             throws AMQException
     {
-        return new AckSubscription(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+        return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
 
 
-    public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+    public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
                                                          AMQShortString consumerTag, FieldTable filters,
                                                          FlowCreditManager creditManager,
                                                          ClientDeliveryMethod deliveryMethod,
                                                          RecordDeliveryMethod recordMethod)
             throws AMQException
     {
-        return new AckSubscription(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+        return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
     }
 
-    static final class AckSubscription extends SubscriptionTarget_0_8
+    static final class AckConsumer extends ConsumerTarget_0_8
     {
-        public AckSubscription(AMQChannel channel,
-                               AMQShortString consumerTag, FieldTable filters,
-                               FlowCreditManager creditManager,
-                               ClientDeliveryMethod deliveryMethod,
-                               RecordDeliveryMethod recordMethod)
+        public AckConsumer(AMQChannel channel,
+                           AMQShortString consumerTag, FieldTable filters,
+                           FlowCreditManager creditManager,
+                           ClientDeliveryMethod deliveryMethod,
+                           RecordDeliveryMethod recordMethod)
             throws AMQException
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
@@ -305,7 +305,7 @@ public abstract class SubscriptionTarget
     }
 
 
-    private static final Logger _logger = Logger.getLogger(SubscriptionTarget_0_8.class);
+    private static final Logger _logger = Logger.getLogger(ConsumerTarget_0_8.class);
 
     private final AMQChannel _channel;
 
@@ -320,12 +320,12 @@ public abstract class SubscriptionTarget
 
 
 
-    public SubscriptionTarget_0_8(AMQChannel channel,
-                                  AMQShortString consumerTag,
-                                  FieldTable arguments,
-                                  FlowCreditManager creditManager,
-                                  ClientDeliveryMethod deliveryMethod,
-                                  RecordDeliveryMethod recordMethod)
+    public ConsumerTarget_0_8(AMQChannel channel,
+                              AMQShortString consumerTag,
+                              FieldTable arguments,
+                              FlowCreditManager creditManager,
+                              ClientDeliveryMethod deliveryMethod,
+                              RecordDeliveryMethod recordMethod)
             throws AMQException
     {
         super(State.ACTIVE);
@@ -357,20 +357,20 @@ public abstract class SubscriptionTarget
         }
     }
 
-    public Subscription getSubscription()
+    public Consumer getConsumer()
     {
-        return _subscription;
+        return _consumer;
     }
 
     @Override
-    public void subscriptionRemoved(final Subscription sub)
+    public void consumerRemoved(final Consumer sub)
     {
     }
 
     @Override
-    public void subscriptionRegistered(final Subscription sub)
+    public void consumerAdded(final Consumer sub)
     {
-        _subscription = sub;
+        _consumer = sub;
     }
 
     public AMQSessionModel getSessionModel()
@@ -417,7 +417,7 @@ public abstract class SubscriptionTarget
         boolean closed = false;
         State state = getState();
 
-        getSubscription().getSendLock();
+        getConsumer().getSendLock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -433,7 +433,7 @@ public abstract class SubscriptionTarget
         }
         finally
         {
-            getSubscription().releaseSendLock();
+            getConsumer().releaseSendLock();
         }
     }
 
@@ -488,14 +488,14 @@ public abstract class SubscriptionTarget
     protected void sendToClient(final ServerMessage message, final InstanceProperties props, final long deliveryTag)
             throws AMQException
     {
-        _deliveryMethod.deliverToClient(getSubscription(), message, props, deliveryTag);
+        _deliveryMethod.deliverToClient(getConsumer(), message, props, deliveryTag);
 
     }
 
 
     protected void recordMessageDelivery(final MessageInstance entry, final long deliveryTag)
     {
-        _recordMethod.recordMessageDelivery(getSubscription(),entry,deliveryTag);
+        _recordMethod.recordMessageDelivery(getConsumer(),entry,deliveryTag);
     }
 
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Wed Feb  5 10:29:55 2014
@@ -24,11 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.consumer.Consumer;
 
 import java.util.Map;
 
@@ -53,11 +49,11 @@ public class ExtractResendAndRequeue imp
     {
 
         message.setRedelivered();
-        final Subscription subscription = message.getDeliveredSubscription();
-        if (subscription != null)
+        final Consumer consumer = message.getDeliveredConsumer();
+        if (consumer != null)
         {
             // Consumer exists
-            if (!subscription.isClosed())
+            if (!consumer.isClosed())
             {
                 _msgToResend.put(deliveryTag, message);
             }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java Wed Feb  5 10:29:55 2014
@@ -21,9 +21,9 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 
 public interface RecordDeliveryMethod
 {
-    void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag);
+    void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag);
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Wed Feb  5 10:29:55 2014
@@ -156,14 +156,14 @@ public class BasicConsumeMethodHandler i
 
 
                 }
-                catch (AMQQueue.ExistingExclusiveSubscription e)
+                catch (AMQQueue.ExistingExclusiveConsumer e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "
                                                    + queue.getName()
                                                    + " as it already has an existing exclusive consumer");
                 }
-                catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+                catch (AMQQueue.ExistingConsumerPreventsExclusive e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Wed Feb  5 10:29:55 2014
@@ -38,14 +38,13 @@ import org.apache.qpid.server.flow.Messa
 import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionTarget_0_8;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
 import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.EnumSet;
@@ -133,7 +132,7 @@ public class BasicGetMethodHandler imple
         {
 
             @Override
-            public void deliverToClient(final Subscription sub, final ServerMessage message, final
+            public void deliverToClient(final Consumer sub, final ServerMessage message, final
                                         InstanceProperties props, final long deliveryTag)
             throws AMQException
             {
@@ -150,30 +149,30 @@ public class BasicGetMethodHandler imple
         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
+            public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
             {
                 channel.addUnacknowledgedMessage(entry, deliveryTag, null);
             }
         };
 
-        SubscriptionTarget_0_8 target;
-        EnumSet<Subscription.Option> options = EnumSet.of(Subscription.Option.TRANSIENT, Subscription.Option.ACQUIRES,
-                                                          Subscription.Option.SEES_REQUEUES);
+        ConsumerTarget_0_8 target;
+        EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+                                                          Consumer.Option.SEES_REQUEUES);
         if(acks)
         {
 
-            target = SubscriptionTarget_0_8.createAckTarget(channel,
-                                                            AMQShortString.EMPTY_STRING, null,
-                                                            singleMessageCredit, getDeliveryMethod, getRecordMethod);
+            target = ConsumerTarget_0_8.createAckTarget(channel,
+                                                        AMQShortString.EMPTY_STRING, null,
+                                                        singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
         else
         {
-            target = SubscriptionTarget_0_8.createNoAckTarget(channel,
-                                                              AMQShortString.EMPTY_STRING, null,
-                                                              singleMessageCredit, getDeliveryMethod, getRecordMethod);
+            target = ConsumerTarget_0_8.createNoAckTarget(channel,
+                                                          AMQShortString.EMPTY_STRING, null,
+                                                          singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
 
-        Subscription sub = queue.registerSubscription(target, null, AMQMessage.class, "", options);
+        Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
         sub.flush();
         sub.close();
         return(!singleMessageCredit.hasCredit());

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Wed Feb  5 10:29:55 2014
@@ -30,10 +30,9 @@ import org.apache.qpid.server.flow.Limit
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -49,8 +48,8 @@ import java.util.Set;
  */
 public class AckTest extends QpidTestCase
 {
-    private SubscriptionTarget_0_8 _subscriptionTarget;
-    private Subscription _subscription;
+    private ConsumerTarget_0_8 _subscriptionTarget;
+    private Consumer _consumer;
 
     private AMQProtocolSession _protocolSession;
 
@@ -180,10 +179,13 @@ public class AckTest extends QpidTestCas
      */
     public void testAckChannelAssociationTest() throws AMQException
     {
-        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, new LimitlessCreditManager());
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
-                                                               Subscription.Option.ACQUIRES));
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -207,16 +209,16 @@ public class AckTest extends QpidTestCas
     public void testNoAckMode() throws AMQException
     {
         // false arg means no acks expected
-        _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
-                                                                       DEFAULT_CONSUMER_TAG,
-                                                                       null,
-                                                                       new LimitlessCreditManager());
-        _subscription = _queue.registerSubscription(_subscriptionTarget,
-                                                    null,
-                                                    AMQMessage.class,
-                                                    DEFAULT_CONSUMER_TAG.toString(),
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
-                                                               Subscription.Option.ACQUIRES));
+        _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+                                                                   DEFAULT_CONSUMER_TAG,
+                                                                   null,
+                                                                   new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget,
+                                       null,
+                                       AMQMessage.class,
+                                       DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount);
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
@@ -233,12 +235,12 @@ public class AckTest extends QpidTestCas
     {
         // false arg means no acks expected
 
-        _subscriptionTarget = SubscriptionTarget_0_8.createNoAckTarget(_channel,
-                                                                       DEFAULT_CONSUMER_TAG,
-                                                                       null,
-                                                                       new LimitlessCreditManager());
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES, Subscription.Option.ACQUIRES));
+        _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel,
+                                                                   DEFAULT_CONSUMER_TAG,
+                                                                   null,
+                                                                   new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES));
         final int msgCount = 10;
         publishMessages(msgCount, true);
 
@@ -256,13 +258,13 @@ public class AckTest extends QpidTestCas
     public void testSingleAckReceivedTest() throws AMQException
     {
 
-        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
-                                                                     DEFAULT_CONSUMER_TAG,
-                                                                     null,
-                                                                     new LimitlessCreditManager());
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
-                                                               Subscription.Option.ACQUIRES));
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
 
         final int msgCount = 10;
         publishMessages(msgCount);
@@ -293,13 +295,13 @@ public class AckTest extends QpidTestCas
     public void testMultiAckReceivedTest() throws AMQException
     {
 
-        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
-                                                                     DEFAULT_CONSUMER_TAG,
-                                                                     null,
-                                                                     new LimitlessCreditManager());
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
-                                                               Subscription.Option.ACQUIRES));
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
 
         final int msgCount = 10;
         publishMessages(msgCount);
@@ -327,13 +329,13 @@ public class AckTest extends QpidTestCas
     public void testMultiAckAllReceivedTest() throws AMQException
     {
 
-        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel,
-                                                                     DEFAULT_CONSUMER_TAG,
-                                                                     null,
-                                                                     new LimitlessCreditManager());
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
-                                                               Subscription.Option.ACQUIRES));
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel,
+                                                                 DEFAULT_CONSUMER_TAG,
+                                                                 null,
+                                                                 new LimitlessCreditManager());
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
 
         final int msgCount = 10;
         publishMessages(msgCount);
@@ -364,15 +366,15 @@ public class AckTest extends QpidTestCas
         Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
 
 
-        _subscriptionTarget = SubscriptionTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
-                                                               Subscription.Option.ACQUIRES));
+        _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager);
+        _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(),
+                                       EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                  Consumer.Option.ACQUIRES));
 
         final int msgCount = 1;
         publishMessages(msgCount);
 
-        _subscription.externalStateChange();
+        _consumer.externalStateChange();
 
         _channel.acknowledgeMessage(1, false);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Wed Feb  5 10:29:55 2014
@@ -27,9 +27,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -65,7 +63,7 @@ public class ExtractResendAndRequeueTest
     private static final int INITIAL_MSG_COUNT = 10;
     private AMQQueue _queue;
     private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
-    private Subscription _subscription;
+    private Consumer _consumer;
     private boolean _queueDeleted;
 
     @Override
@@ -76,8 +74,8 @@ public class ExtractResendAndRequeueTest
         _queue = mock(AMQQueue.class);
         when(_queue.getName()).thenReturn(getName());
         when(_queue.isDeleted()).thenReturn(_queueDeleted);
-        _subscription = mock(Subscription.class);
-        when(_subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+        _consumer = mock(Consumer.class);
+        when(_consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
 
 
         long id = 0;
@@ -123,7 +121,7 @@ public class ExtractResendAndRequeueTest
         // Acquire messages in subscription
         for(QueueEntry entry : messageList)
         {
-            when(entry.getDeliveredSubscription()).thenReturn(_subscription);
+            when(entry.getDeliveredConsumer()).thenReturn(_consumer);
         }
     }
 
@@ -168,7 +166,7 @@ public class ExtractResendAndRequeueTest
         acquireMessages(_referenceList);
 
         // Close subscription
-        when(_subscription.isClosed()).thenReturn(true);
+        when(_consumer.isClosed()).thenReturn(true);
 
         final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
         final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Wed Feb  5 10:29:55 2014
@@ -49,7 +49,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -245,7 +245,7 @@ public class InternalTestProtocolSession
 
 
         @Override
-        public void deliverToClient(Subscription sub, ServerMessage message,
+        public void deliverToClient(Consumer sub, ServerMessage message,
                                     InstanceProperties props, long deliveryTag) throws AMQException
         {
             _deliveryCount.incrementAndGet();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Wed Feb  5 10:29:55 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -131,7 +131,7 @@ public class QueueBrowserUsesNoAckTest e
         // indicate we are using the prefetch credit. i.e. using acks not No-Ack
         assertTrue("The subscription has been suspended",
                    !getChannel().getSubscription(browser).getState()
-                           .equals(Subscription.State.SUSPENDED));
+                           .equals(Consumer.State.SUSPENDED));
     }
 
     private void checkStoreContents(int messageCount)

Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Wed Feb  5 10:29:55 2014
@@ -43,15 +43,14 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 
-class SubscriptionTarget_1_0 extends AbstractSubscriptionTarget
+class ConsumerTarget_1_0 extends AbstractConsumerTarget
 {
     private final boolean _acquires;
     private SendingLink_1_0 _link;
@@ -61,10 +60,10 @@ class SubscriptionTarget_1_0 extends Abs
     private Binary _transactionId;
     private final AMQPDescribedTypeRegistry _typeRegistry;
     private final SectionEncoder _sectionEncoder;
-    private Subscription _subscription;
+    private Consumer _consumer;
 
-    public SubscriptionTarget_1_0(final SendingLink_1_0 link,
-                                  boolean acquires)
+    public ConsumerTarget_1_0(final SendingLink_1_0 link,
+                              boolean acquires)
     {
         super(State.SUSPENDED);
         _link = link;
@@ -73,9 +72,9 @@ class SubscriptionTarget_1_0 extends Abs
         _acquires = acquires;
     }
 
-    public Subscription getSubscription()
+    public Consumer getConsumer()
     {
-        return _subscription;
+        return _consumer;
     }
 
     private SendingLinkEndpoint getEndpoint()
@@ -94,7 +93,7 @@ class SubscriptionTarget_1_0 extends Abs
         boolean closed = false;
         State state = getState();
 
-        getSubscription().getSendLock();
+        getConsumer().getSendLock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -109,7 +108,7 @@ class SubscriptionTarget_1_0 extends Abs
         }
         finally
         {
-            getSubscription().releaseSendLock();
+            getConsumer().releaseSendLock();
         }
     }
 
@@ -255,7 +254,7 @@ class SubscriptionTarget_1_0 extends Abs
 
                             public void onRollback()
                             {
-                                if(queueEntry.isAcquiredBy(getSubscription()))
+                                if(queueEntry.isAcquiredBy(getConsumer()))
                                 {
                                     queueEntry.release();
                                     _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -385,7 +384,7 @@ class SubscriptionTarget_1_0 extends Abs
 
                             public void postCommit()
                             {
-                                if(_queueEntry.isAcquiredBy(getSubscription()))
+                                if(_queueEntry.isAcquiredBy(getConsumer()))
                                 {
                                     _queueEntry.delete();
                                 }
@@ -499,13 +498,13 @@ class SubscriptionTarget_1_0 extends Abs
     }
 
     @Override
-    public void subscriptionRegistered(final Subscription sub)
+    public void consumerAdded(final Consumer sub)
     {
-        _subscription = sub;
+        _consumer = sub;
     }
 
     @Override
-    public void subscriptionRemoved(final Subscription sub)
+    public void consumerRemoved(final Consumer sub)
     {
 
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Wed Feb  5 10:29:55 2014
@@ -68,8 +68,7 @@ import org.apache.qpid.server.filter.Sim
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
@@ -82,8 +81,8 @@ public class SendingLink_1_0 implements 
     private VirtualHost _vhost;
     private SendingDestination _destination;
 
-    private Subscription _subscription;
-    private SubscriptionTarget_1_0 _target;
+    private Consumer _consumer;
+    private ConsumerTarget_1_0 _target;
 
     private boolean _draining;
     private final Map<Binary, MessageInstance> _unsettledMap =
@@ -112,7 +111,7 @@ public class SendingLink_1_0 implements 
         linkAttachment.setDeliveryStateHandler(this);
         QueueDestination qd = null;
 
-        EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
 
 
         boolean noLocal = false;
@@ -175,11 +174,11 @@ public class SendingLink_1_0 implements 
             }
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
 
-            _target = new SubscriptionTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
+            _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
             if(source.getDistributionMode() != StdDistMode.COPY)
             {
-                options.add(Subscription.Option.ACQUIRES);
-                options.add(Subscription.Option.SEES_REQUEUES);
+                options.add(Consumer.Option.ACQUIRES);
+                options.add(Consumer.Option.SEES_REQUEUES);
             }
 
         }
@@ -376,9 +375,9 @@ public class SendingLink_1_0 implements 
             }
 
 
-            _target = new SubscriptionTarget_1_0(this, true);
-            options.add(Subscription.Option.ACQUIRES);
-            options.add(Subscription.Option.SEES_REQUEUES);
+            _target = new ConsumerTarget_1_0(this, true);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
 
         }
         else
@@ -390,18 +389,18 @@ public class SendingLink_1_0 implements 
         {
             if(noLocal)
             {
-                options.add(Subscription.Option.NO_LOCAL);
+                options.add(Consumer.Option.NO_LOCAL);
             }
 
 
-            _subscription.setNoLocal(noLocal);
+            _consumer.setNoLocal(noLocal);
 
 
             try
             {
-                _subscription = _queue.registerSubscription(_target,
-                                                           messageFilter == null ? null : new SimpleFilterManager(messageFilter),
-                                                           Message_1_0.class, getEndpoint().getName(), options);
+                _consumer = _queue.addConsumer(_target,
+                                               messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+                                               Message_1_0.class, getEndpoint().getName(), options);
             }
             catch (AMQException e)
             {
@@ -428,7 +427,7 @@ public class SendingLink_1_0 implements 
             try
             {
 
-                _subscription.close();
+                _consumer.close();
 
             }
             catch (AMQException e)
@@ -622,7 +621,7 @@ public class SendingLink_1_0 implements 
     public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
     {
 
-        if(_subscription.isActive())
+        if(_consumer.isActive())
         {
             _target.suspend();
         }
@@ -653,7 +652,7 @@ public class SendingLink_1_0 implements 
                 if(outcome instanceof Accepted)
                 {
                     AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
-                    if(_subscription.acquires())
+                    if(_consumer.acquires())
                     {
                         txn.dequeue(Collections.singleton(queueEntry),
                                 new ServerTransaction.Action()
@@ -673,7 +672,7 @@ public class SendingLink_1_0 implements 
                 else if(outcome instanceof Released)
                 {
                     AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getMessageStore());
-                    if(_subscription.acquires())
+                    if(_consumer.acquires())
                     {
                         txn.dequeue(Collections.singleton(queueEntry),
                                 new ServerTransaction.Action()

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java Wed Feb  5 10:29:55 2014
@@ -20,7 +20,6 @@ package org.apache.qpid.server.managemen
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -40,7 +39,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 
@@ -327,8 +326,8 @@ public class MessageServlet extends Abst
                                    : entry.isAcquired()
                                              ? "Acquired"
                                              : "");
-        final Subscription deliveredSubscription = entry.getDeliveredSubscription();
-        object.put("deliveredTo", deliveredSubscription == null ? null : deliveredSubscription.getSubscriptionID());
+        final Consumer deliveredConsumer = entry.getDeliveredConsumer();
+        object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getId());
         ServerMessage message = entry.getMessage();
 
         if(message != null)

Copied: qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java Wed Feb  5 10:29:55 2014
@@ -46,7 +46,7 @@ import java.util.List;
  * SUB-1002 : Close
  * SUB-1003 : State : <state>
  */
-public class SubscriptionLoggingTest extends AbstractTestLogging
+public class ConsumerLoggingTest extends AbstractTestLogging
 {
     static final String SUB_PREFIX = "SUB-";
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org