You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/04 10:17:56 UTC

svn commit: r1772514 [1/2] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/filter/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/or...

Author: rgodfrey
Date: Sun Dec  4 10:17:55 2016
New Revision: 1772514

URL: http://svn.apache.org/viewvc?rev=1772514&view=rev
Log:
QPID-7572 : Remove ConsumerImpl interface and add MessageInstanceConsumer

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java   (with props)
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java   (with props)
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java   (with props)
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNodeTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConsumerAssociation.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Sun Dec  4 10:17:55 2016
@@ -35,9 +35,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
 import org.apache.qpid.server.transport.AMQPConnection;
 
@@ -56,9 +57,9 @@ public abstract class AbstractConsumerTa
 
     private final boolean _isMultiQueue;
     private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
-    private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+    private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>();
 
-    private Iterator<ConsumerImpl> _pullIterator;
+    private Iterator<MessageInstanceConsumer> _pullIterator;
     private boolean _notifyWorkDesired;
     private final AtomicBoolean _scheduled = new AtomicBoolean();
 
@@ -117,7 +118,7 @@ public abstract class AbstractConsumerTa
                 }
             }
 
-            for (ConsumerImpl consumer : _consumers)
+            for (MessageInstanceConsumer consumer : _consumers)
             {
                 consumer.setNotifyWorkDesired(desired);
             }
@@ -144,13 +145,13 @@ public abstract class AbstractConsumerTa
     }
 
     @Override
-    public void consumerAdded(final ConsumerImpl sub)
+    public void consumerAdded(final MessageInstanceConsumer sub)
     {
         _consumers.add(sub);
     }
 
     @Override
-    public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
+    public ListenableFuture<Void> consumerRemoved(final MessageInstanceConsumer sub)
     {
         if(_consumers.contains(sub))
         {
@@ -176,7 +177,7 @@ public abstract class AbstractConsumerTa
         return sessionModel.getAMQPConnection().doOnIOThreadAsync(task);
     }
 
-    private void consumerRemovedInternal(final ConsumerImpl sub)
+    private void consumerRemovedInternal(final MessageInstanceConsumer sub)
     {
         _consumers.remove(sub);
         if(_consumers.isEmpty())
@@ -185,7 +186,7 @@ public abstract class AbstractConsumerTa
         }
     }
 
-    public List<ConsumerImpl> getConsumers()
+    public List<MessageInstanceConsumer> getConsumers()
     {
         return _consumers;
     }
@@ -203,7 +204,7 @@ public abstract class AbstractConsumerTa
     }
 
     @Override
-    public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+    public final long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
     {
         doSend(consumer, entry, batch);
 
@@ -214,14 +215,14 @@ public abstract class AbstractConsumerTa
         return entry.getMessage().getSize();
     }
 
-    protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+    protected abstract void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
 
 
     @Override
     public boolean sendNextMessage()
     {
-        AbstractQueue.MessageContainer messageContainer = null;
-        ConsumerImpl consumer = null;
+        MessageContainer messageContainer = null;
+        MessageInstanceConsumer consumer = null;
         boolean iteratedCompleteList = false;
         while (messageContainer == null)
         {
@@ -270,12 +271,12 @@ public abstract class AbstractConsumerTa
     {
         if (_state.compareAndSet(State.OPEN, State.CLOSED))
         {
-            List<ConsumerImpl> consumers = new ArrayList<>(_consumers);
+            List<MessageInstanceConsumer> consumers = new ArrayList<>(_consumers);
             _consumers.clear();
 
             setNotifyWorkDesired(false);
 
-            for (ConsumerImpl consumer : consumers)
+            for (MessageInstanceConsumer consumer : consumers)
             {
                 consumer.close();
             }

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java?rev=1772514&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java Sun Dec  4 10:17:55 2016
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.consumer;
+
+public enum ConsumerOption
+{
+    ACQUIRES,
+    SEES_REQUEUES,
+    TRANSIENT,
+    EXCLUSIVE,
+    NO_LOCAL,
+    DURABLE
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerOption.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Sun Dec  4 10:17:55 2016
@@ -23,6 +23,7 @@ package org.apache.qpid.server.consumer;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 
@@ -49,9 +50,9 @@ public interface ConsumerTarget
 
     State getState();
 
-    void consumerAdded(ConsumerImpl sub);
+    void consumerAdded(MessageInstanceConsumer sub);
 
-    ListenableFuture<Void> consumerRemoved(ConsumerImpl sub);
+    ListenableFuture<Void> consumerRemoved(MessageInstanceConsumer sub);
 
     long getUnacknowledgedBytes();
 
@@ -59,7 +60,7 @@ public interface ConsumerTarget
 
     AMQSessionModel<?> getSessionModel();
 
-    long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+    long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
 
     boolean sendNextMessage();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java Sun Dec  4 10:17:55 2016
@@ -31,10 +31,9 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.queue.QueueConsumer;
 
 public class FilterSupport
 {
@@ -119,9 +118,9 @@ public class FilterSupport
     @PluggableService
     public static final class NoLocalFilter implements MessageFilter
     {
-        private final MessageSource _queue;
+        private final Queue<?> _queue;
 
-        private NoLocalFilter(MessageSource queue)
+        private NoLocalFilter(Queue<?> queue)
         {
             _queue = queue;
         }
@@ -135,8 +134,8 @@ public class FilterSupport
         public boolean matches(Filterable message)
         {
 
-            final Collection<? extends ConsumerImpl> consumers = _queue.getConsumers();
-            for(ConsumerImpl c : consumers)
+            final Collection<QueueConsumer<?>> consumers = _queue.getConsumers();
+            for(QueueConsumer<?> c : consumers)
             {
                 if(c.getSessionModel().getConnectionReference() == message.getConnectionReference())
                 {

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java?rev=1772514&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java Sun Dec  4 10:17:55 2016
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public final class MessageContainer
+{
+    private final MessageInstance _messageInstance;
+    private final MessageReference<?> _messageReference;
+    private final boolean _hasNoAvailableMessages;
+
+    public MessageContainer(final boolean hasNoAvailableMessages)
+    {
+        this(null, null, hasNoAvailableMessages);
+    }
+
+    public MessageContainer(final MessageInstance messageInstance,
+                            final MessageReference<?> messageReference, final boolean hasNoAvailableMessages)
+    {
+        _messageInstance = messageInstance;
+        _messageReference = messageReference;
+        _hasNoAvailableMessages = hasNoAvailableMessages;
+    }
+
+    public MessageInstance getMessageInstance()
+    {
+        return _messageInstance;
+    }
+
+    public MessageReference<?> getMessageReference()
+    {
+        return _messageReference;
+    }
+
+    public boolean hasNoAvailableMessages()
+    {
+        return _hasNoAvailableMessages;
+    }
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfoImpl.java Sun Dec  4 10:17:55 2016
@@ -54,8 +54,8 @@ public class MessageInfoImpl implements
         final ServerMessage message = instance.getMessage();
         final AMQMessageHeader messageHeader = message.getMessageHeader();
 
-        _deliveredTo = instance.getDeliveredConsumer() == null ? null : String.valueOf(instance.getDeliveredConsumer()
-                                                                                                .getConsumerNumber());
+        _deliveredTo = instance.getAcquiringConsumer() == null ? null : String.valueOf(instance.getAcquiringConsumer()
+                                                                                                .getIdentifier());
         _arrivalTime = message.getArrivalTime() == 0L ? null : new Date(message.getArrivalTime());
         _persistent = message.isPersistent();
         _messageId = messageHeader.getMessageId();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Sun Dec  4 10:17:55 2016
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.message;
 
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -50,27 +49,25 @@ public interface MessageInstance
 
     boolean acquiredByConsumer();
 
-    boolean isAcquiredBy(ConsumerImpl consumer);
+    boolean isAcquiredBy(MessageInstanceConsumer consumer);
 
-    boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+    boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer);
 
     void setRedelivered();
 
     boolean isRedelivered();
 
-    ConsumerImpl getDeliveredConsumer();
-
     void reject();
 
-    boolean isRejectedBy(ConsumerImpl consumer);
+    boolean isRejectedBy(MessageInstanceConsumer consumer);
 
     boolean getDeliveredToConsumer();
 
     boolean expired();
 
-    boolean acquire(ConsumerImpl sub);
+    boolean acquire(MessageInstanceConsumer sub);
 
-    boolean makeAcquisitionUnstealable(final ConsumerImpl consumer);
+    boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer);
 
     boolean makeAcquisitionStealable();
 
@@ -80,7 +77,7 @@ public interface MessageInstance
 
     Filterable asFilterable();
 
-    ConsumerImpl getAcquiringConsumer();
+    MessageInstanceConsumer getAcquiringConsumer();
 
     MessageEnqueueRecord getEnqueueRecord();
 
@@ -171,7 +168,7 @@ public interface MessageInstance
         }
     }
 
-    abstract class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
+    abstract class ConsumerAcquiredState<C extends MessageInstanceConsumer> extends EntryState
     {
         public abstract C getConsumer();
 
@@ -188,7 +185,7 @@ public interface MessageInstance
         }
     }
 
-    final class StealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+    final class StealableConsumerAcquiredState<C extends MessageInstanceConsumer> extends ConsumerAcquiredState
     {
         private final C _consumer;
         private final UnstealableConsumerAcquiredState<C> _unstealableState;
@@ -211,7 +208,7 @@ public interface MessageInstance
         }
     }
 
-    final class UnstealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+    final class UnstealableConsumerAcquiredState<C extends MessageInstanceConsumer> extends ConsumerAcquiredState
     {
         private final StealableConsumerAcquiredState<C> _stealableState;
 
@@ -246,7 +243,7 @@ public interface MessageInstance
 
     void release();
 
-    void release(ConsumerImpl release);
+    void release(MessageInstanceConsumer release);
 
     void delete();
 

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java?rev=1772514&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java Sun Dec  4 10:17:55 2016
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.consumer.ConsumerTarget;
+
+public interface MessageInstanceConsumer
+{
+    boolean isClosed();
+
+    boolean acquires();
+
+    String getName();
+
+    void close();
+
+    void externalStateChange();
+
+    Object getIdentifier();
+
+    MessageContainer pullMessage();
+
+    ConsumerTarget getTarget();
+
+    void setNotifyWorkDesired(boolean desired);
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Sun Dec  4 10:17:55 2016
@@ -23,7 +23,7 @@ package org.apache.qpid.server.message;
 import java.util.Collection;
 import java.util.EnumSet;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -31,15 +31,15 @@ import org.apache.qpid.server.store.Tran
 
 public interface MessageSource extends TransactionLogResource, MessageNode
 {
-     ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters,
-                              Class<? extends ServerMessage> messageClass,
-                              String consumerName,
-                              EnumSet<ConsumerImpl.Option> options,
-                              Integer priority)
+    MessageInstanceConsumer addConsumer(ConsumerTarget target, FilterManager filters,
+                                        Class<? extends ServerMessage> messageClass,
+                                        String consumerName,
+                                        EnumSet<ConsumerOption> options,
+                                        Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted;
 
-    Collection<? extends ConsumerImpl> getConsumers();
+    Collection<? extends MessageInstanceConsumer> getConsumers();
 
     boolean verifySessionAccess(AMQSessionModel<?> session);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Sun Dec  4 10:17:55 2016
@@ -20,10 +20,13 @@
  */
 package org.apache.qpid.server.model;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 
 @ManagedObject(creatable = false, amqpName = "org.apache.qpid.Consumer")
-public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, ConsumerImpl
+public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, MessageInstanceConsumer
 {
     String DISTRIBUTION_MODE = "distributionMode";
     String EXCLUSIVE = "exclusive";
@@ -38,6 +41,8 @@ public interface Consumer<X extends Cons
     @ManagedContextDefault( name = SUSPEND_NOTIFICATION_PERIOD)
     long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;
 
+    AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0);
+
     @DerivedAttribute
     String getLinkName();
 
@@ -73,4 +78,15 @@ public interface Consumer<X extends Cons
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetch")
     long getUnacknowledgedMessages();
 
+
+    AMQSessionModel getSessionModel();
+
+    long getConsumerNumber();
+
+    boolean isSuspended();
+
+    boolean seesRequeues();
+
+    boolean isActive();
+
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun Dec  4 10:17:55 2016
@@ -74,7 +74,7 @@ import org.apache.qpid.filter.selector.T
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
@@ -85,6 +85,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageInfo;
 import org.apache.qpid.server.message.MessageInfoImpl;
@@ -695,7 +696,7 @@ public abstract class AbstractQueue<X ex
                                          final FilterManager filters,
                                          final Class<? extends ServerMessage> messageClass,
                                          final String consumerName,
-                                         final EnumSet<ConsumerImpl.Option> optionSet,
+                                         final EnumSet<ConsumerOption> optionSet,
                                          final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted
@@ -758,7 +759,7 @@ public abstract class AbstractQueue<X ex
                                                   FilterManager filters,
                                                   final Class<? extends ServerMessage> messageClass,
                                                   final String consumerName,
-                                                  EnumSet<ConsumerImpl.Option> optionSet,
+                                                  EnumSet<ConsumerOption> optionSet,
                                                   final Integer priority)
             throws ExistingExclusiveConsumer, ConsumerAccessRefused,
                    ExistingConsumerPreventsExclusive, QueueDeleted
@@ -843,13 +844,13 @@ public abstract class AbstractQueue<X ex
                 throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
         }
 
-        boolean exclusive =  optionSet.contains(ConsumerImpl.Option.EXCLUSIVE);
-        boolean isTransient =  optionSet.contains(ConsumerImpl.Option.TRANSIENT);
+        boolean exclusive =  optionSet.contains(ConsumerOption.EXCLUSIVE);
+        boolean isTransient =  optionSet.contains(ConsumerOption.TRANSIENT);
 
-        if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL))
+        if(_noLocal && !optionSet.contains(ConsumerOption.NO_LOCAL))
         {
             optionSet = EnumSet.copyOf(optionSet);
-            optionSet.add(ConsumerImpl.Option.NO_LOCAL);
+            optionSet.add(ConsumerOption.NO_LOCAL);
         }
 
         if(exclusive && getConsumerCount() != 0)
@@ -891,7 +892,7 @@ public abstract class AbstractQueue<X ex
         if(_ensureNondestructiveConsumers)
         {
             optionSet = EnumSet.copyOf(optionSet);
-            optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
+            optionSet.removeAll(EnumSet.of(ConsumerOption.SEES_REQUEUES, ConsumerOption.ACQUIRES));
         }
 
         QueueConsumerImpl consumer = new QueueConsumerImpl(this,
@@ -1917,41 +1918,6 @@ public abstract class AbstractQueue<X ex
         return _queueStatistics.getAvailableCount() != 0;
     }
 
-    public static final class MessageContainer
-    {
-        private final MessageInstance _messageInstance;
-        private final MessageReference<?> _messageReference;
-        private final boolean _hasNoAvailableMessages;
-
-        public MessageContainer(final boolean hasNoAvailableMessages)
-        {
-            this(null, null, hasNoAvailableMessages);
-        }
-
-        public MessageContainer(final MessageInstance messageInstance,
-                                final MessageReference<?> messageReference, final boolean hasNoAvailableMessages)
-        {
-            _messageInstance = messageInstance;
-            _messageReference = messageReference;
-            _hasNoAvailableMessages = hasNoAvailableMessages;
-        }
-
-        public MessageInstance getMessageInstance()
-        {
-            return _messageInstance;
-        }
-
-        public MessageReference<?> getMessageReference()
-        {
-            return _messageReference;
-        }
-
-        public boolean hasNoAvailableMessages()
-        {
-            return _hasNoAvailableMessages;
-        }
-    }
-
     private static final MessageContainer NO_MESSAGES = new MessageContainer(true);
     private static final MessageContainer HAS_MESSAGES = new MessageContainer(false);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Sun Dec  4 10:17:55 2016
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
@@ -183,7 +182,7 @@ public class DefinedGroupMessageGroupMan
             }
         }
 
-        ConsumerImpl assignedSub = group.getConsumer();
+        QueueConsumer<?> assignedSub = group.getConsumer();
 
         if(assignedSub == sub)
         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Sun Dec  4 10:17:55 2016
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -220,7 +220,7 @@ public class LastValueQueueList extends
         }
 
         @Override
-        public void release(ConsumerImpl consumer)
+        public void release(MessageInstanceConsumer consumer)
         {
             super.release(consumer);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Sun Dec  4 10:17:55 2016
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
@@ -47,9 +48,9 @@ import org.apache.qpid.server.logging.Ev
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -107,7 +108,7 @@ class QueueConsumerImpl
                       final String consumerName,
                       final FilterManager filters,
                       final Class<? extends ServerMessage> messageClass,
-                      EnumSet<Option> optionSet,
+                      EnumSet<ConsumerOption> optionSet,
                       final Integer priority)
     {
         super(parentsMap(queue, target.getSessionModel().getModelObject()),
@@ -116,9 +117,9 @@ class QueueConsumerImpl
         _sessionReference = target.getSessionModel().getConnectionReference();
         _consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
         _filters = filters;
-        _acquires = optionSet.contains(Option.ACQUIRES);
-        _seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
-        _isTransient = optionSet.contains(Option.TRANSIENT);
+        _acquires = optionSet.contains(ConsumerOption.ACQUIRES);
+        _seesRequeues = optionSet.contains(ConsumerOption.SEES_REQUEUES);
+        _isTransient = optionSet.contains(ConsumerOption.TRANSIENT);
         _target = target;
         _queue = queue;
         _linkName = consumerName;
@@ -134,7 +135,7 @@ class QueueConsumerImpl
     private static Map<String, Object> createAttributeMap(final AMQSessionModel sessionModel,
                                                           String linkName,
                                                           FilterManager filters,
-                                                          EnumSet<Option> optionSet,
+                                                          EnumSet<ConsumerOption> optionSet,
                                                           Integer priority)
     {
         Map<String,Object> attributes = new HashMap<String, Object>();
@@ -145,10 +146,10 @@ class QueueConsumerImpl
                       + "|"
                       + linkName;
         attributes.put(NAME, name);
-        attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
-        attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
-        attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
-        attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
+        attributes.put(EXCLUSIVE, optionSet.contains(ConsumerOption.EXCLUSIVE));
+        attributes.put(NO_LOCAL, optionSet.contains(ConsumerOption.NO_LOCAL));
+        attributes.put(DISTRIBUTION_MODE, optionSet.contains(ConsumerOption.ACQUIRES) ? "MOVE" : "COPY");
+        attributes.put(DURABLE,optionSet.contains(ConsumerOption.DURABLE));
         attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
         if(priority != null)
         {
@@ -220,9 +221,9 @@ class QueueConsumerImpl
     }
 
     @Override
-    public MessageSource getMessageSource()
+    public Object getIdentifier()
     {
-        return _queue;
+        return getConsumerNumber();
     }
 
     @Override
@@ -333,9 +334,9 @@ class QueueConsumerImpl
     }
 
     @Override
-    public AbstractQueue.MessageContainer pullMessage()
+    public MessageContainer pullMessage()
     {
-        AbstractQueue.MessageContainer messageContainer = _queue.deliverSingleMessage(this);
+        MessageContainer messageContainer = _queue.deliverSingleMessage(this);
         if (messageContainer != null)
         {
             _deliveredCount.incrementAndGet();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun Dec  4 10:17:55 2016
@@ -33,8 +33,6 @@ public interface QueueEntry extends Mess
 
     boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
 
-    QueueConsumer getDeliveredConsumer();
-
     boolean isQueueDeleted();
 
     QueueEntry getNextNode();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Dec  4 10:17:55 2016
@@ -30,11 +30,11 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
@@ -55,7 +55,7 @@ public abstract class QueueEntryImpl imp
 
     private final MessageReference _message;
 
-    private Set<Long> _rejectedBy = null;
+    private Set<Object> _rejectedBy = null;
 
     private static final EntryState HELD_STATE = new EntryState()
     {
@@ -263,7 +263,7 @@ public abstract class QueueEntryImpl imp
         boolean acquired = acquire();
         if(!acquired)
         {
-            QueueConsumer consumer = getDeliveredConsumer();
+            QueueConsumer consumer = getAcquiringConsumer();
             acquired = removeAcquisitionFromConsumer(consumer);
             if(acquired)
             {
@@ -304,7 +304,7 @@ public abstract class QueueEntryImpl imp
         return acquired;
     }
 
-    public boolean acquire(ConsumerImpl sub)
+    public boolean acquire(MessageInstanceConsumer sub)
     {
         final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
         if(acquired)
@@ -315,7 +315,7 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         if(state instanceof StealableConsumerAcquiredState
@@ -356,13 +356,13 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public ConsumerImpl getAcquiringConsumer()
+    public QueueConsumer<?> getAcquiringConsumer()
     {
-        ConsumerImpl consumer;
+        QueueConsumer<?> consumer;
         EntryState state = _state;
         if (state instanceof ConsumerAcquiredState)
         {
-            consumer = ((ConsumerAcquiredState)state).getConsumer();
+            consumer = ((ConsumerAcquiredState<QueueConsumer<?>>)state).getConsumer();
         }
         else
         {
@@ -372,14 +372,14 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean isAcquiredBy(ConsumerImpl consumer)
+    public boolean isAcquiredBy(MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer);
     }
 
     @Override
-    public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+    public boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         if(state instanceof StealableConsumerAcquiredState
@@ -410,7 +410,7 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public void release(ConsumerImpl consumer)
+    public void release(MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
@@ -469,24 +469,18 @@ public abstract class QueueEntryImpl imp
         return false;
     }
 
-    @Override
-    public QueueConsumer getDeliveredConsumer()
-    {
-        return (QueueConsumer) getAcquiringConsumer();
-    }
-
     public void reject()
     {
-        QueueConsumer consumer = getDeliveredConsumer();
+        QueueConsumer consumer = getAcquiringConsumer();
 
         if (consumer != null)
         {
             if (_rejectedBy == null)
             {
-                _rejectedBy = new HashSet<Long>();
+                _rejectedBy = new HashSet<>();
             }
 
-            _rejectedBy.add(consumer.getConsumerNumber());
+            _rejectedBy.add(consumer.getIdentifier());
         }
         else
         {
@@ -494,12 +488,13 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    public boolean isRejectedBy(ConsumerImpl consumer)
+    @Override
+    public boolean isRejectedBy(MessageInstanceConsumer consumer)
     {
 
         if (_rejectedBy != null) // We have consumers that rejected this message
         {
-            return _rejectedBy.contains(consumer.getConsumerNumber());
+            return _rejectedBy.contains(consumer.getIdentifier());
         }
         else // This message hasn't been rejected yet.
         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Sun Dec  4 10:17:55 2016
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.message.MessageSource;
@@ -95,7 +95,7 @@ public class TrustStoreMessageSource ext
                                 final FilterManager filters,
                                 final Class<? extends ServerMessage> messageClass,
                                 final String consumerName,
-                                final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+                                final EnumSet<ConsumerOption> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted
     {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java Sun Dec  4 10:17:55 2016
@@ -29,18 +29,19 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -86,7 +87,7 @@ public abstract class AbstractSystemMess
                                 final FilterManager filters,
                                 final Class<? extends ServerMessage> messageClass,
                                 final String consumerName,
-                                final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+                                final EnumSet<ConsumerOption> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted
     {
@@ -108,14 +109,14 @@ public abstract class AbstractSystemMess
         return true;
     }
 
-    protected class Consumer implements ConsumerImpl
+    protected class Consumer implements MessageInstanceConsumer
     {
 
-        private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement();
         private final List<PropertiesMessageInstance> _queue =
                 Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
         private final ConsumerTarget _target;
         private final String _name;
+        private final Object _identifier = new Object();
 
 
         public Consumer(final String consumerName, ConsumerTarget target)
@@ -134,13 +135,19 @@ public abstract class AbstractSystemMess
         }
 
         @Override
+        public Object getIdentifier()
+        {
+            return _identifier;
+        }
+
+        @Override
         public ConsumerTarget getTarget()
         {
             return _target;
         }
 
         @Override
-        public AbstractQueue.MessageContainer pullMessage()
+        public MessageContainer pullMessage()
         {
             if (!_queue.isEmpty())
             {
@@ -148,7 +155,7 @@ public abstract class AbstractSystemMess
                 if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
                 {
                     _queue.remove(0);
-                    return new AbstractQueue.MessageContainer(propertiesMessageInstance, null, false);
+                    return new MessageContainer(propertiesMessageInstance, null, false);
                 }
             }
             return null;
@@ -164,54 +171,6 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public long getBytesOut()
-        {
-            return 0;
-        }
-
-        @Override
-        public long getMessagesOut()
-        {
-            return 0;
-        }
-
-        @Override
-        public long getUnacknowledgedBytes()
-        {
-            return 0;
-        }
-
-        @Override
-        public long getUnacknowledgedMessages()
-        {
-            return 0;
-        }
-
-        @Override
-        public AMQSessionModel getSessionModel()
-        {
-            return _target.getSessionModel();
-        }
-
-        @Override
-        public MessageSource getMessageSource()
-        {
-            return AbstractSystemMessageSource.this;
-        }
-
-        @Override
-        public long getConsumerNumber()
-        {
-            return _id;
-        }
-
-        @Override
-        public boolean isSuspended()
-        {
-            return !isActive();
-        }
-
-        @Override
         public boolean isClosed()
         {
             return false;
@@ -224,24 +183,12 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean seesRequeues()
-        {
-            return false;
-        }
-
-        @Override
         public void close()
         {
             _consumers.remove(this);
         }
 
         @Override
-        public boolean isActive()
-        {
-            return _target.isNotifyWorkDesired();
-        }
-
-        @Override
         public String getName()
         {
             return _name;
@@ -307,7 +254,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public ConsumerImpl getAcquiringConsumer()
+        public Consumer getAcquiringConsumer()
         {
             return _consumer;
         }
@@ -319,13 +266,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean isAcquiredBy(final ConsumerImpl consumer)
+        public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
         {
             return consumer == _consumer && !isDeleted();
         }
 
         @Override
-        public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+        public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
         {
             return consumer == _consumer;
         }
@@ -343,19 +290,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public Consumer getDeliveredConsumer()
-        {
-            return isDeleted() ? null : _consumer;
-        }
-
-        @Override
         public void reject()
         {
             delete();
         }
 
         @Override
-        public boolean isRejectedBy(final ConsumerImpl consumer)
+        public boolean isRejectedBy(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -373,13 +314,13 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public boolean acquire(final ConsumerImpl sub)
+        public boolean acquire(final MessageInstanceConsumer sub)
         {
             return false;
         }
 
         @Override
-        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
         {
             return false;
         }
@@ -435,7 +376,7 @@ public abstract class AbstractSystemMess
         }
 
         @Override
-        public void release(ConsumerImpl consumer)
+        public void release(MessageInstanceConsumer consumer)
         {
             release();
         }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Sun Dec  4 10:17:55 2016
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.message.ServerMessage;
@@ -51,7 +51,7 @@ public class VirtualHostPropertiesNode e
                                 final FilterManager filters,
                                 final Class<? extends ServerMessage> messageClass,
                                 final String consumerName,
-                                final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+                                final EnumSet<ConsumerOption> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused, QueueDeleted
     {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Sun Dec  4 10:17:55 2016
@@ -35,6 +35,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.BrokerModel;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
@@ -44,7 +45,7 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.network.Ticker;
@@ -59,7 +60,7 @@ public class TestConsumerTarget implemen
     private ArrayList<MessageInstance> _messages = new ArrayList<MessageInstance>();
 
     private boolean _isActive = true;
-    private ConsumerImpl _consumer;
+    private MessageInstanceConsumer _consumer;
     private MockSessionModel _sessionModel = new MockSessionModel();
     private boolean _notifyDesired;
 
@@ -118,7 +119,7 @@ public class TestConsumerTarget implemen
     {
     }
 
-    public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+    public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
     {
         long size = entry.getMessage().getSize();
         if (_messages.contains(entry))
@@ -158,13 +159,13 @@ public class TestConsumerTarget implemen
     }
 
     @Override
-    public void consumerAdded(final ConsumerImpl sub)
+    public void consumerAdded(final MessageInstanceConsumer sub)
     {
         _consumer = sub;
     }
 
     @Override
-    public ListenableFuture<Void> consumerRemoved(final ConsumerImpl sub)
+    public ListenableFuture<Void> consumerRemoved(final MessageInstanceConsumer sub)
     {
        close();
         return Futures.immediateFuture(null);
@@ -179,7 +180,7 @@ public class TestConsumerTarget implemen
     @Override
     public boolean processPending()
     {
-        AbstractQueue.MessageContainer messageContainer = _consumer.pullMessage();
+        MessageContainer messageContainer = _consumer.pullMessage();
         if (messageContainer == null)
         {
             return false;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Sun Dec  4 10:17:55 2016
@@ -47,13 +47,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.TestConsumerTarget;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.DirectExchangeImpl;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
@@ -171,8 +172,8 @@ abstract class AbstractQueueTestBase ext
 
         // Check adding a consumer adds it to the queue
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                  ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
         assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
         assertEquals("Queue does not have active consumer", 1,
@@ -203,8 +204,8 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA, null, null);
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                  ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
         assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after an enqueue",
@@ -221,8 +222,8 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageA, null, null);
         _queue.enqueue(messageB, null, null);
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                  ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
         assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after enqueues",
@@ -244,8 +245,8 @@ abstract class AbstractQueueTestBase ext
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
         _queue.enqueue(messageA, null, null);
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
 
         assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
@@ -270,8 +271,8 @@ abstract class AbstractQueueTestBase ext
         when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
         _queue.enqueue(messageA, null, null);
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
 
         assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
@@ -296,8 +297,8 @@ abstract class AbstractQueueTestBase ext
         _queue.enqueue(messageB, null, null);
 
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
         while(_consumerTarget.processPending());
 
         assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
@@ -324,8 +325,8 @@ abstract class AbstractQueueTestBase ext
 
 
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -380,7 +381,7 @@ abstract class AbstractQueueTestBase ext
             }
 
             @Override
-            public long send(ConsumerImpl consumer, MessageInstance entry, boolean batch)
+            public long send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
             {
                 try
                 {
@@ -394,8 +395,8 @@ abstract class AbstractQueueTestBase ext
         };
 
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES,
-                                                      ConsumerImpl.Option.ACQUIRES), 0);
+                                                          EnumSet.of(ConsumerOption.SEES_REQUEUES,
+                                                                     ConsumerOption.ACQUIRES), 0);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -465,8 +466,8 @@ abstract class AbstractQueueTestBase ext
         ServerMessage messageC = createMessage(new Long(26));
 
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
@@ -518,12 +519,12 @@ abstract class AbstractQueueTestBase ext
 
 
         QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
-                                                                     EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                                    ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                                     EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                                ConsumerOption.SEES_REQUEUES), 0);
 
         QueueConsumer consumer2 = (QueueConsumer) _queue.addConsumer(target2, null, messageA.getClass(), "test",
-                                                                     EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                                    ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                                     EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                                ConsumerOption.SEES_REQUEUES), 0);
 
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
@@ -562,8 +563,8 @@ abstract class AbstractQueueTestBase ext
         // Check adding an exclusive consumer adds it to the queue
 
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.EXCLUSIVE, ConsumerImpl.Option.ACQUIRES,
-                                                      ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.EXCLUSIVE, ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
 
         assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
@@ -585,8 +586,8 @@ abstract class AbstractQueueTestBase ext
         {
 
             _queue.addConsumer(subB, null, messageA.getClass(), "test",
-                               EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                          ConsumerImpl.Option.SEES_REQUEUES), 0);
+                               EnumSet.of(ConsumerOption.ACQUIRES,
+                                          ConsumerOption.SEES_REQUEUES), 0);
 
         }
         catch (MessageSource.ExistingExclusiveConsumer e)
@@ -599,14 +600,14 @@ abstract class AbstractQueueTestBase ext
         // existing consumer
         _consumer.close();
         _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                  ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                     ConsumerOption.SEES_REQUEUES), 0);
 
         try
         {
 
             _consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
-                                                              EnumSet.of(ConsumerImpl.Option.EXCLUSIVE), 0);
+                                                              EnumSet.of(ConsumerOption.EXCLUSIVE), 0);
 
         }
         catch (MessageSource.ExistingConsumerPreventsExclusive e)

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java Sun Dec  4 10:17:55 2016
@@ -20,11 +20,11 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -61,7 +61,7 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public ConsumerImpl getAcquiringConsumer()
+    public MessageInstanceConsumer getAcquiringConsumer()
     {
         return null;
     }
@@ -73,13 +73,13 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public boolean isAcquiredBy(final ConsumerImpl consumer)
+    public boolean isAcquiredBy(final MessageInstanceConsumer consumer)
     {
         return false;
     }
 
     @Override
-    public boolean removeAcquisitionFromConsumer(final ConsumerImpl consumer)
+    public boolean removeAcquisitionFromConsumer(final MessageInstanceConsumer consumer)
     {
         return false;
     }
@@ -95,13 +95,13 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public boolean acquire(final ConsumerImpl sub)
+    public boolean acquire(final MessageInstanceConsumer sub)
     {
         return false;
     }
 
     @Override
-    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
     {
         return false;
     }
@@ -117,11 +117,6 @@ public class MockMessageInstance impleme
         return false;
     }
 
-    public ConsumerImpl getDeliveredConsumer()
-    {
-        return null;
-    }
-
     public boolean getDeliveredToConsumer()
     {
         return false;
@@ -147,7 +142,7 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public boolean isRejectedBy(final ConsumerImpl consumer)
+    public boolean isRejectedBy(final MessageInstanceConsumer consumer)
     {
         return false;
     }
@@ -158,7 +153,7 @@ public class MockMessageInstance impleme
     }
 
     @Override
-    public void release(final ConsumerImpl release)
+    public void release(final MessageInstanceConsumer release)
     {
     }
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java Sun Dec  4 10:17:55 2016
@@ -28,7 +28,7 @@ import java.util.EnumSet;
 
 import junit.framework.AssertionFailedError;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
@@ -63,7 +63,7 @@ public class PriorityQueueTest extends A
         queue.enqueue(createMessage(9L, (byte) 0), null, null);
 
         // Register subscriber
-        queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class), 0);
+        queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerOption.class), 0);
 
         while(getConsumer().processPending());
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Sun Dec  4 10:17:55 2016
@@ -145,7 +145,9 @@ public abstract class QueueEntryImplTest
         StealableConsumerAcquiredState
                 owningState = new StealableConsumerAcquiredState(consumer);
         when(consumer.getOwningState()).thenReturn(owningState);
-        when(consumer.getConsumerNumber()).thenReturn(_consumerId++);
+        final Long consumerNum = _consumerId++;
+        when(consumer.getConsumerNumber()).thenReturn(consumerNum);
+        when(consumer.getIdentifier()).thenReturn(consumerNum);
         return consumer;
     }
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Sun Dec  4 10:17:55 2016
@@ -28,10 +28,10 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.TestConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
@@ -56,8 +56,8 @@ public class StandardQueueTest extends A
         ServerMessage message = createMessage(25l);
         QueueConsumer consumer =
                 (QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test",
-                                                       EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                                  ConsumerImpl.Option.SEES_REQUEUES), 0);
+                                                       EnumSet.of(ConsumerOption.ACQUIRES,
+                                                                  ConsumerOption.SEES_REQUEUES), 0);
 
         getQueue().enqueue(message, null, null);
         consumer.close();
@@ -82,8 +82,8 @@ public class StandardQueueTest extends A
                           null,
                           createMessage(-1l).getClass(),
                           "test",
-                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                     ConsumerImpl.Option.SEES_REQUEUES), 0);
+                          EnumSet.of(ConsumerOption.ACQUIRES,
+                                     ConsumerOption.SEES_REQUEUES), 0);
 
         // put test messages into a queue
         putGivenNumberOfMessages(queue, 4);
@@ -139,7 +139,7 @@ public class StandardQueueTest extends A
              * @param entry
              * @param batch
              */
-            public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+            public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
             {
                 long size = super.send(consumer, entry, batch);
                 latch.countDown();
@@ -152,8 +152,8 @@ public class StandardQueueTest extends A
                               null,
                               entries.get(0).getMessage().getClass(),
                               "test",
-                              EnumSet.of(ConsumerImpl.Option.ACQUIRES,
-                                         ConsumerImpl.Option.SEES_REQUEUES), 0);
+                              EnumSet.of(ConsumerOption.ACQUIRES,
+                                         ConsumerOption.SEES_REQUEUES), 0);
 
 
         // wait up to 1 minute for message receipt
@@ -271,7 +271,7 @@ public class StandardQueueTest extends A
         }
 
         @Override
-        public boolean acquire(ConsumerImpl sub)
+        public boolean acquire(MessageInstanceConsumer sub)
         {
             if(_message.getMessageNumber() % 2 == 0)
             {
@@ -284,7 +284,7 @@ public class StandardQueueTest extends A
         }
 
         @Override
-        public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+        public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
         {
             return true;
         }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java?rev=1772514&r1=1772513&r2=1772514&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/security/TrustStoreMessageSourceTest.java Sun Dec  4 10:17:55 2016
@@ -36,13 +36,14 @@ import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AbstractQueue;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.TestMemoryMessageStore;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -72,12 +73,12 @@ public class TrustStoreMessageSourceTest
 
     public void testAddConsumer() throws Exception
     {
-        final EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
+        final EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
         final ConsumerTarget target = mock(ConsumerTarget.class);
         when(target.allocateCredit(any(ServerMessage.class))).thenReturn(true);
 
-        ConsumerImpl consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
-        final AbstractQueue.MessageContainer messageContainer = consumer.pullMessage();
+        MessageInstanceConsumer consumer = _trustStoreMessageSource.addConsumer(target, null, ServerMessage.class, getTestName(), options, 0);
+        final MessageContainer messageContainer = consumer.pullMessage();
         assertNotNull("Could not pull message of TrustStore", messageContainer);
         final ServerMessage message = messageContainer.getMessageInstance().getMessage();
         assertCertificates(getCertificatesFromMessage(message));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org