You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/04 10:17:56 UTC

svn commit: r1772514 [2/2] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/filter/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/or...

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java Sun Dec  4 10:17:55 2016
@@ -19,19 +19,17 @@
 package org.apache.qpid.server.virtualhost;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.EnumSet;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
-import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -53,12 +51,12 @@ public class VirtualHostPropertiesNodeTe
 
     public void testAddConsumer() throws Exception
     {
-        final EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        final EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
         final ConsumerTarget target = mock(ConsumerTarget.class);
         when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
 
-        ConsumerImpl consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-        final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+        MessageInstanceConsumer consumer = _virtualHostPropertiesNode.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+        final MessageContainer messageContainer = consumer.pullMessage();
         assertNotNull("Could not pull message from VirtualHostPropertyNode", messageContainer);
     }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sun Dec  4 10:17:55 2016
@@ -31,12 +31,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
@@ -186,7 +186,7 @@ public class ConsumerTarget_0_10 extends
 
     private final AddMessageDispositionListenerAction _postIdSettingAction;
 
-    public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+    public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
     {
         ServerMessage serverMsg = entry.getMessage();
 
@@ -409,7 +409,7 @@ public class ConsumerTarget_0_10 extends
                            });
    }
 
-    void reject(final ConsumerImpl consumer, final MessageInstance entry)
+    void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
     {
         entry.setRedelivered();
         if (entry.makeAcquisitionUnstealable(consumer))
@@ -418,7 +418,7 @@ public class ConsumerTarget_0_10 extends
         }
     }
 
-    void release(final ConsumerImpl consumer,
+    void release(final MessageInstanceConsumer consumer,
                  final MessageInstance entry,
                  final boolean setRedelivered)
     {
@@ -442,7 +442,7 @@ public class ConsumerTarget_0_10 extends
         }
     }
 
-    protected void sendToDLQOrDiscard(final ConsumerImpl consumer, MessageInstance entry)
+    protected void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
     {
         final ServerMessage msg = entry.getMessage();
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Sun Dec  4 10:17:55 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 
 class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -34,11 +34,11 @@ class ExplicitAcceptDispositionChangeLis
 
     private final MessageInstance _entry;
     private final ConsumerTarget_0_10 _target;
-    private final ConsumerImpl _consumer;
+    private final MessageInstanceConsumer _consumer;
 
     public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
                                                    ConsumerTarget_0_10 target,
-                                                   final ConsumerImpl consumer)
+                                                   final MessageInstanceConsumer consumer)
     {
         _entry = entry;
         _target = target;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Sun Dec  4 10:17:55 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
 {
@@ -33,11 +33,11 @@ class ImplicitAcceptDispositionChangeLis
 
     private final MessageInstance _entry;
     private final ConsumerTarget_0_10 _target;
-    private final ConsumerImpl _consumer;
+    private final MessageInstanceConsumer _consumer;
 
     public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
                                                    ConsumerTarget_0_10 target,
-                                                   final ConsumerImpl consumer)
+                                                   final MessageInstanceConsumer consumer)
     {
         _entry = entry;
         _target = target;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Sun Dec  4 10:17:55 2016
@@ -21,8 +21,8 @@
 
 package org.apache.qpid.server.protocol.v0_10;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.transport.Method;
 
 public class MessageAcceptCompletionListener implements Method.CompletionListener
@@ -30,12 +30,12 @@ public class MessageAcceptCompletionList
     private final ConsumerTarget_0_10 _sub;
     private final MessageInstance _entry;
     private final ServerSession _session;
-    private final ConsumerImpl _consumer;
+    private final MessageInstanceConsumer _consumer;
     private long _messageSize;
     private boolean _restoreCredit;
 
     public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub,
-                                           final ConsumerImpl consumer,
+                                           final MessageInstanceConsumer consumer,
                                            ServerSession session,
                                            MessageInstance entry,
                                            boolean restoreCredit)

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Sun Dec  4 10:17:55 2016
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.LogMessage;
@@ -65,6 +64,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
@@ -541,7 +541,7 @@ public class ServerSession extends Sessi
         // Broker shouldn't block awaiting close - thus do override this method to do nothing
     }
 
-    public void acknowledge(final ConsumerImpl consumer,
+    public void acknowledge(final MessageInstanceConsumer consumer,
                             final ConsumerTarget_0_10 target,
                             final MessageInstance entry)
     {
@@ -578,11 +578,11 @@ public class ServerSession extends Sessi
     }
 
 
-    public void register(final ConsumerImpl consumerImpl)
+    public void register(final MessageInstanceConsumer messageInstanceConsumer)
     {
-        if(consumerImpl instanceof Consumer<?>)
+        if(messageInstanceConsumer instanceof Consumer<?>)
         {
-            final Consumer<?> consumer = (Consumer<?>) consumerImpl;
+            final Consumer<?> consumer = (Consumer<?>) messageInstanceConsumer;
             _consumers.add(consumer);
             consumer.addChangeListener(_consumerClosedListener);
             consumerAdded(consumer);

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Sun Dec  4 10:17:55 2016
@@ -39,7 +39,7 @@ import org.apache.qpid.bytebuffer.QpidBy
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.ErrorCodes;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.ArrivalTimeFilter;
 import org.apache.qpid.server.filter.FilterManager;
@@ -337,18 +337,18 @@ public class ServerSessionDelegate exten
                     ((ServerSession)session).register(destination, target);
                     try
                     {
-                        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+                        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
                         if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
                         {
-                            options.add(ConsumerImpl.Option.ACQUIRES);
+                            options.add(ConsumerOption.ACQUIRES);
                         }
                         if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                         {
-                            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+                            options.add(ConsumerOption.SEES_REQUEUES);
                         }
                         if(method.getExclusive())
                         {
-                            options.add(ConsumerImpl.Option.EXCLUSIVE);
+                            options.add(ConsumerOption.EXCLUSIVE);
                         }
                         for(MessageSource source : sources)
                         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sun Dec  4 10:17:55 2016
@@ -59,7 +59,7 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.ErrorCodes;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
@@ -78,6 +78,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
@@ -306,8 +307,8 @@ public class AMQChannel
                 new GetDeliveryMethod(queue);
 
         ConsumerTarget_0_8 target;
-        EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
-                                                          ConsumerImpl.Option.SEES_REQUEUES);
+        EnumSet<ConsumerOption> options = EnumSet.of(ConsumerOption.TRANSIENT, ConsumerOption.ACQUIRES,
+                                                     ConsumerOption.SEES_REQUEUES);
         if (acks)
         {
 
@@ -322,7 +323,7 @@ public class AMQChannel
                                                              INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
         }
 
-        ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
+        MessageInstanceConsumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options, null);
         target.updateNotifyWorkDesired();
         target.sendNextMessage();
         target.close();
@@ -710,7 +711,7 @@ public class AMQChannel
         }
 
         ConsumerTarget_0_8 target;
-        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
         final boolean multiQueue = sources.size()>1;
         if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
@@ -720,20 +721,20 @@ public class AMQChannel
         else if(acks)
         {
             target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager, multiQueue);
-            options.add(ConsumerImpl.Option.ACQUIRES);
-            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+            options.add(ConsumerOption.ACQUIRES);
+            options.add(ConsumerOption.SEES_REQUEUES);
         }
         else
         {
             target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments,
                                                           INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
-            options.add(ConsumerImpl.Option.ACQUIRES);
-            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+            options.add(ConsumerOption.ACQUIRES);
+            options.add(ConsumerOption.SEES_REQUEUES);
         }
 
         if(exclusive)
         {
-            options.add(ConsumerImpl.Option.EXCLUSIVE);
+            options.add(ConsumerOption.EXCLUSIVE);
         }
 
 
@@ -817,7 +818,7 @@ public class AMQChannel
 
             for(MessageSource source : sources)
             {
-                ConsumerImpl sub =
+                MessageInstanceConsumer sub =
                         source.addConsumer(target,
                                            filterManager,
                                            AMQMessage.class,
@@ -859,10 +860,10 @@ public class AMQChannel
         }
 
         ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
-        Collection<ConsumerImpl> subs = target == null ? null : target.getConsumers();
+        Collection<MessageInstanceConsumer> subs = target == null ? null : target.getConsumers();
         if (subs != null)
         {
-            for(ConsumerImpl sub : subs)
+            for(MessageInstanceConsumer sub : subs)
             {
                 if (sub instanceof Consumer<?>)
                 {
@@ -955,7 +956,7 @@ public class AMQChannel
      */
     public void addUnacknowledgedMessage(MessageInstance entry,
                                          long deliveryTag,
-                                         ConsumerImpl consumer,
+                                         MessageInstanceConsumer consumer,
                                          final boolean usesCredit)
     {
         if (_logger.isDebugEnabled())
@@ -1010,7 +1011,7 @@ public class AMQChannel
         for (Map.Entry<Long, MessageConsumerAssociation> entry : copy.entrySet())
         {
             MessageInstance unacked = entry.getValue().getMessageInstance();
-            ConsumerImpl consumer = entry.getValue().getConsumer();
+            MessageInstanceConsumer consumer = entry.getValue().getConsumer();
             // Mark message redelivered
             unacked.setRedelivered();
             // here we wish to restore credit
@@ -1117,7 +1118,7 @@ public class AMQChannel
         {
             long deliveryTag = entry.getKey();
             MessageInstance message = entry.getValue().getMessageInstance();
-            ConsumerImpl consumer = entry.getValue().getConsumer();
+            MessageInstanceConsumer consumer = entry.getValue().getConsumer();
 
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
@@ -1142,7 +1143,7 @@ public class AMQChannel
         {
             long deliveryTag = entry.getKey();
             MessageInstance message = entry.getValue().getMessageInstance();
-            ConsumerImpl consumer = entry.getValue().getConsumer();
+            MessageInstanceConsumer consumer = entry.getValue().getConsumer();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
             message.decrementDeliveryCount();
@@ -1187,7 +1188,7 @@ public class AMQChannel
                 // may need to deliver queued messages
                 for (ConsumerTarget_0_8 s : getConsumerTargets())
                 {
-                    for(ConsumerImpl sub : s.getConsumers())
+                    for(MessageInstanceConsumer sub : s.getConsumers())
                     {
                         sub.externalStateChange();
                     }
@@ -1273,7 +1274,7 @@ public class AMQChannel
         for(MessageConsumerAssociation association : _resendList)
         {
             final MessageInstance messageInstance = association.getMessageInstance();
-            final ConsumerImpl consumer = association.getConsumer();
+            final MessageInstanceConsumer consumer = association.getConsumer();
             if (consumer.isClosed())
             {
                 messageInstance.release(consumer);
@@ -1298,7 +1299,7 @@ public class AMQChannel
             _suspended.set(false);
             for(ConsumerTarget_0_8 target : getConsumerTargets())
             {
-                for(ConsumerImpl sub : target.getConsumers())
+                for(MessageInstanceConsumer sub : target.getConsumers())
                 {
                     sub.externalStateChange();
                 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sun Dec  4 10:17:55 2016
@@ -27,11 +27,11 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -96,7 +96,7 @@ public abstract class ConsumerTarget_0_8
          * @throws QpidException
          */
         @Override
-        public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
         {
             // We don't decrement the reference here as we don't want to consume the message
             // but we do want to send it to the client.
@@ -142,7 +142,7 @@ public abstract class ConsumerTarget_0_8
          * @param batch
          */
         @Override
-        public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
         {
             // if we do not need to wait for client acknowledgements
             // we can decrement the reference count immediately.
@@ -245,7 +245,7 @@ public abstract class ConsumerTarget_0_8
          * @param batch
          */
         @Override
-        public void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+        public void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
         {
 
             // put queue entry on a list and then notify the connection to read list.
@@ -397,7 +397,7 @@ public abstract class ConsumerTarget_0_8
         updateNotifyWorkDesired();
     }
 
-    protected long sendToClient(final ConsumerImpl consumer, final ServerMessage message,
+    protected long sendToClient(final MessageInstanceConsumer consumer, final ServerMessage message,
                                 final InstanceProperties props,
                                 final long deliveryTag)
     {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java Sun Dec  4 10:17:55 2016
@@ -20,14 +20,14 @@
 
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 public interface MessageConsumerAssociation
 {
     MessageInstance getMessageInstance();
 
-    ConsumerImpl getConsumer();
+    MessageInstanceConsumer getConsumer();
 
     long getSize();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Sun Dec  4 10:17:55 2016
@@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 
 
 public interface UnacknowledgedMessageMap
@@ -38,7 +38,7 @@ public interface UnacknowledgedMessageMa
 
     void visit(Visitor visitor);
 
-    void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
+    void add(long deliveryTag, MessageInstance message, final MessageInstanceConsumer consumer, final boolean usesCredit);
 
     MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit);
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Sun Dec  4 10:17:55 2016
@@ -27,8 +27,8 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -36,10 +36,10 @@ class UnacknowledgedMessageMapImpl imple
     private static final class MessageConsumerAssociationImpl implements MessageConsumerAssociation
     {
         private final MessageInstance _messageInstance;
-        private final ConsumerImpl _consumer;
+        private final MessageInstanceConsumer _consumer;
         private final boolean _usesCredit;
 
-        private MessageConsumerAssociationImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
+        private MessageConsumerAssociationImpl(final MessageInstance messageInstance, final MessageInstanceConsumer consumer, final boolean usesCredit)
         {
             _messageInstance = messageInstance;
             _consumer = consumer;
@@ -53,7 +53,7 @@ class UnacknowledgedMessageMapImpl imple
         }
 
         @Override
-        public ConsumerImpl getConsumer()
+        public MessageInstanceConsumer getConsumer()
         {
             return _consumer;
         }
@@ -81,6 +81,7 @@ class UnacknowledgedMessageMapImpl imple
         _creditRestorer = creditRestorer;
     }
 
+    @Override
     public void collect(long deliveryTag, boolean multiple, Map<Long, MessageConsumerAssociation> msgs)
     {
         if (multiple)
@@ -106,6 +107,7 @@ class UnacknowledgedMessageMapImpl imple
         }
     }
 
+    @Override
     public MessageConsumerAssociation remove(long deliveryTag, final boolean restoreCredit)
     {
         MessageConsumerAssociationImpl entry = _map.remove(deliveryTag);
@@ -120,6 +122,7 @@ class UnacknowledgedMessageMapImpl imple
         return entry;
     }
 
+    @Override
     public void visit(Visitor visitor)
     {
         for (Map.Entry<Long, MessageConsumerAssociationImpl> entry : _map.entrySet())
@@ -129,7 +132,8 @@ class UnacknowledgedMessageMapImpl imple
         visitor.visitComplete();
     }
 
-    public void add(long deliveryTag, MessageInstance message, final ConsumerImpl consumer, final boolean usesCredit)
+    @Override
+    public void add(long deliveryTag, MessageInstance message, final MessageInstanceConsumer consumer, final boolean usesCredit)
     {
         if(_map.put(deliveryTag, new MessageConsumerAssociationImpl(message, consumer, usesCredit)) == null)
         {
@@ -141,17 +145,20 @@ class UnacknowledgedMessageMapImpl imple
         }
     }
 
+    @Override
     public int size()
     {
         return _size;
     }
 
+    @Override
     public MessageInstance get(long key)
     {
         MessageConsumerAssociation association = _map.get(key);
         return association == null ? null : association.getMessageInstance();
     }
 
+    @Override
     public Collection<MessageConsumerAssociation> acknowledge(long deliveryTag, boolean multiple)
     {
         if(multiple)

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Sun Dec  4 10:17:55 2016
@@ -28,8 +28,8 @@ import java.util.Collection;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -45,7 +45,7 @@ public class UnacknowledgedMessageMapTes
             return input.getMessageInstance();
         }
     };
-    private final ConsumerImpl _consumer = mock(ConsumerImpl.class);
+    private final MessageInstanceConsumer _consumer = mock(MessageInstanceConsumer.class);
 
     public void testDeletedMessagesCantBeAcknowledged()
     {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Sun Dec  4 10:17:55 2016
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
@@ -100,7 +100,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
     }
 
-    public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+    public void doSend(final MessageInstanceConsumer consumer, final MessageInstance entry, boolean batch)
     {
         // TODO
         ServerMessage serverMessage = entry.getMessage();
@@ -330,16 +330,16 @@ class ConsumerTarget_1_0 extends Abstrac
 
         private final MessageInstance _queueEntry;
         private final Binary _deliveryTag;
-        private final ConsumerImpl _consumer;
+        private final MessageInstanceConsumer _consumer;
 
-        public DispositionAction(Binary tag, MessageInstance queueEntry, final ConsumerImpl consumer)
+        public DispositionAction(Binary tag, MessageInstance queueEntry, final MessageInstanceConsumer consumer)
         {
             _deliveryTag = tag;
             _queueEntry = queueEntry;
             _consumer = consumer;
         }
 
-        public ConsumerImpl getConsumer()
+        public MessageInstanceConsumer getConsumer()
         {
             return _consumer;
         }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sun Dec  4 10:17:55 2016
@@ -39,10 +39,11 @@ import org.apache.qpid.exchange.Exchange
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Binding;
@@ -84,7 +85,7 @@ public class SendingLink_1_0 implements
     private NamedAddressSpace _addressSpace;
     private SendingDestination _destination;
 
-    private ConsumerImpl _consumer;
+    private MessageInstanceConsumer _consumer;
     private ConsumerTarget_1_0 _target;
 
     private boolean _draining;
@@ -113,7 +114,7 @@ public class SendingLink_1_0 implements
         _durability = source.getDurable();
         QueueDestination qd = null;
 
-        EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
 
 
         boolean noLocal = false;
@@ -169,8 +170,8 @@ public class SendingLink_1_0 implements
             _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY);
             if(source.getDistributionMode() != StdDistMode.COPY)
             {
-                options.add(ConsumerImpl.Option.ACQUIRES);
-                options.add(ConsumerImpl.Option.SEES_REQUEUES);
+                options.add(ConsumerOption.ACQUIRES);
+                options.add(ConsumerOption.SEES_REQUEUES);
             }
 
         }
@@ -329,8 +330,8 @@ public class SendingLink_1_0 implements
 
 
             _target = new ConsumerTarget_1_0(this, true);
-            options.add(ConsumerImpl.Option.ACQUIRES);
-            options.add(ConsumerImpl.Option.SEES_REQUEUES);
+            options.add(ConsumerOption.ACQUIRES);
+            options.add(ConsumerOption.SEES_REQUEUES);
 
         }
         else
@@ -342,13 +343,13 @@ public class SendingLink_1_0 implements
         {
             if(noLocal)
             {
-                options.add(ConsumerImpl.Option.NO_LOCAL);
+                options.add(ConsumerOption.NO_LOCAL);
             }
 
             if(_durability == TerminusDurability.CONFIGURATION ||
                _durability == TerminusDurability.UNSETTLED_STATE )
             {
-                options.add(ConsumerImpl.Option.DURABLE);
+                options.add(ConsumerOption.DURABLE);
             }
 
             try
@@ -692,7 +693,7 @@ public class SendingLink_1_0 implements
         return _addressSpace;
     }
 
-    public ConsumerImpl getConsumer()
+    public MessageInstanceConsumer getConsumer()
     {
         return _consumer;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sun Dec  4 10:17:55 2016
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
 import org.apache.qpid.server.logging.EventLogger;
@@ -58,6 +57,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -1084,7 +1084,7 @@ public class Session_1_0 implements AMQS
 
     private void registerConsumer(final SendingLink_1_0 link)
     {
-        ConsumerImpl consumer = link.getConsumer();
+        MessageInstanceConsumer consumer = link.getConsumer();
         if(consumer instanceof Consumer<?>)
         {
             Consumer<?> modelConsumer = (Consumer<?>) consumer;

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Dec  4 10:17:55 2016
@@ -50,7 +50,7 @@ import javax.security.auth.Subject;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
@@ -58,6 +58,7 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
@@ -1434,7 +1435,7 @@ class ManagementNode implements MessageS
                                                             final FilterManager filters,
                                                             final Class<? extends ServerMessage> messageClass,
                                                             final String consumerName,
-                                                            final EnumSet<ConsumerImpl.Option> options,
+                                                            final EnumSet<ConsumerOption> options,
                                                             final Integer priority)
     {
 
@@ -1543,7 +1544,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public ConsumerImpl getAcquiringConsumer()
+        public MessageInstanceConsumer getAcquiringConsumer()
         {
             return null;
         }
@@ -1555,13 +1556,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean isAcquiredBy(final ConsumerImpl consumer)
+        public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
         {
             return false;
         }
 
         @Override
-        public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+        public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -1579,19 +1580,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public ConsumerImpl getDeliveredConsumer()
-        {
-            return null;
-        }
-
-        @Override
         public void reject()
         {
 
         }
 
         @Override
-        public boolean isRejectedBy(final ConsumerImpl consumer)
+        public boolean isRejectedBy(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -1609,13 +1604,13 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public boolean acquire(final ConsumerImpl sub)
+        public boolean acquire(final MessageInstanceConsumer sub)
         {
             return false;
         }
 
         @Override
-        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -1671,7 +1666,7 @@ class ManagementNode implements MessageS
         }
 
         @Override
-        public void release(final ConsumerImpl release)
+        public void release(final MessageInstanceConsumer release)
         {
 
         }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java Sun Dec  4 10:17:55 2016
@@ -26,29 +26,28 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 
-class ManagementNodeConsumer implements ConsumerImpl, MessageDestination
+class ManagementNodeConsumer implements MessageInstanceConsumer, MessageDestination
 {
-    private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
     private final ManagementNode _managementNode;
     private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>());
     private final ConsumerTarget _target;
     private final String _name;
+    private final Object _identifier = new Object();
 
 
     public ManagementNodeConsumer(final String consumerName, final ManagementNode managementNode, ConsumerTarget target)
@@ -68,7 +67,13 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public AbstractQueue.MessageContainer pullMessage()
+    public Object getIdentifier()
+    {
+        return _identifier;
+    }
+
+    @Override
+    public MessageContainer pullMessage()
     {
         if (!_queue.isEmpty())
         {
@@ -77,7 +82,7 @@ class ManagementNodeConsumer implements
             if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
             {
                 _queue.remove(0);
-                return new AbstractQueue.MessageContainer(managementResponse, null, false);
+                return new MessageContainer(managementResponse, null, false);
             }
         }
         return null;
@@ -92,55 +97,12 @@ class ManagementNodeConsumer implements
         }
     }
 
-    @Override
-    public long getBytesOut()
-    {
-        return 0;
-    }
-
-    @Override
-    public long getMessagesOut()
-    {
-        return 0;
-    }
-
-    @Override
-    public long getUnacknowledgedBytes()
-    {
-        return 0;
-    }
-
-    @Override
-    public long getUnacknowledgedMessages()
-    {
-        return 0;
-    }
-
-    @Override
-    public AMQSessionModel getSessionModel()
+    AMQSessionModel getSessionModel()
     {
         return _target.getSessionModel();
     }
 
     @Override
-    public MessageSource getMessageSource()
-    {
-        return _managementNode;
-    }
-
-    @Override
-    public long getConsumerNumber()
-    {
-        return _id;
-    }
-
-    @Override
-    public boolean isSuspended()
-    {
-        return false;
-    }
-
-    @Override
     public boolean isClosed()
     {
         return false;
@@ -153,24 +115,11 @@ class ManagementNodeConsumer implements
     }
 
     @Override
-    public boolean seesRequeues()
-    {
-        return false;
-    }
-
-    @Override
     public void close()
     {
         _managementNode.unregisterConsumer(this);
     }
 
-
-    @Override
-    public boolean isActive()
-    {
-        return false;
-    }
-
     @Override
     public NamedAddressSpace getAddressSpace()
     {

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java Sun Dec  4 10:17:55 2016
@@ -20,10 +20,10 @@
  */
 package org.apache.qpid.server.management.amqp;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -85,7 +85,7 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public ConsumerImpl getAcquiringConsumer()
+    public ManagementNodeConsumer getAcquiringConsumer()
     {
         return _consumer;
     }
@@ -97,13 +97,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean isAcquiredBy(final ConsumerImpl consumer)
+    public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
     {
         return consumer == _consumer && !isDeleted();
     }
 
     @Override
-    public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+    public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
     {
         return consumer == _consumer;
     }
@@ -121,19 +121,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public ManagementNodeConsumer getDeliveredConsumer()
-    {
-        return isDeleted() ? null : _consumer;
-    }
-
-    @Override
     public void reject()
     {
         delete();
     }
 
     @Override
-    public boolean isRejectedBy(final ConsumerImpl consumer)
+    public boolean isRejectedBy(final MessageInstanceConsumer consumer)
     {
         return false;
     }
@@ -151,13 +145,13 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public boolean acquire(final ConsumerImpl sub)
+    public boolean acquire(final MessageInstanceConsumer sub)
     {
         return false;
     }
 
     @Override
-    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
     {
         return false;
     }
@@ -213,7 +207,7 @@ class ManagementResponse implements Mess
     }
 
     @Override
-    public void release(final ConsumerImpl release)
+    public void release(final MessageInstanceConsumer release)
     {
         release();
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org