You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/06 01:41:08 UTC

svn commit: r1565024 - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/messag...

Author: rgodfrey
Date: Thu Feb  6 00:41:07 2014
New Revision: 1565024

URL: http://svn.apache.org/r1565024
Log:
hide methods from the public interfaces of Consumer and MessageInstance

Modified:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java Thu Feb  6 00:41:07 2014
@@ -44,8 +44,6 @@ public interface Consumer
 
     LogActor getLogActor();
 
-    boolean isTransient();
-
     long getBytesOut();
 
     long getMessagesOut();
@@ -63,16 +61,12 @@ public interface Consumer
 
     AMQSessionModel getSessionModel();
 
-    MessageInstance.ConsumerAcquiredState getOwningState();
-
     void setNoLocal(boolean noLocal);
 
     long getId();
 
     boolean isSuspended();
 
-    boolean hasInterest(MessageInstance msg);
-
     boolean isClosed();
 
     boolean acquires();
@@ -81,17 +75,6 @@ public interface Consumer
 
     void close() throws AMQException;
 
-    void send(MessageInstance entry, boolean batch) throws AMQException;
-
-    boolean resend(MessageInstance entry) throws AMQException;
-
-    void flushBatched();
-
-    void queueDeleted();
-
-
-    boolean wouldSuspend(MessageInstance msg);
-
     boolean trySendLock();
 
 
@@ -99,16 +82,12 @@ public interface Consumer
 
     void releaseSendLock();
 
-    void restoreCredit(final MessageInstance queueEntry);
-
     void setStateListener(final StateChangeListener<? extends Consumer, State> listener);
 
     public State getState();
 
     boolean isActive();
 
-    void queueEmpty() throws AMQException;
-
     String getName();
 
     void flush() throws AMQException;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Feb  6 00:41:07 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -427,10 +428,10 @@ public abstract class AbstractExchange i
         return queues;
     }
 
-    public final int send(final ServerMessage message,
+    public final <C extends Consumer> int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final Action<MessageInstance> postEnqueueAction)
+                          final Action<MessageInstance<C>> postEnqueueAction)
     {
         List<? extends BaseQueue> queues = route(message, instanceProperties);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Thu Feb  6 00:41:07 2014
@@ -32,6 +32,7 @@ import org.apache.qpid.AMQInternalExcept
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -334,10 +335,10 @@ public class DefaultExchange implements 
         return _id;
     }
 
-    public final int send(final ServerMessage message,
+    public final <C extends Consumer> int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final Action<MessageInstance> postEnqueueAction)
+                          final Action<MessageInstance<C>> postEnqueueAction)
     {
         final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
         if(q == null)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Thu Feb  6 00:41:07 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.message;
 
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 
@@ -36,8 +37,8 @@ public interface MessageDestination exte
      * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
      * @return the number of queues in which the message was enqueued performed
      */
-    int send(ServerMessage message,
+    <C extends Consumer> int send(ServerMessage message,
              InstanceProperties instanceProperties,
              ServerTransaction txn,
-             Action<MessageInstance> postEnqueueAction);
+             Action<MessageInstance<C>> postEnqueueAction);
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Thu Feb  6 00:41:07 2014
@@ -30,7 +30,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 
-public interface MessageInstance
+public interface MessageInstance<C extends Consumer>
 {
 
 
@@ -45,33 +45,33 @@ public interface MessageInstance
 
     void decrementDeliveryCount();
 
-    void addStateChangeListener(StateChangeListener<MessageInstance, State> listener);
+    void addStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener);
 
-    boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener);
+    boolean removeStateChangeListener(StateChangeListener<MessageInstance<C>, State> listener);
 
     boolean acquiredByConsumer();
 
-    boolean isAcquiredBy(Consumer consumer);
+    boolean isAcquiredBy(C consumer);
 
     void setRedelivered();
 
     boolean isRedelivered();
 
-    Consumer getDeliveredConsumer();
+    C getDeliveredConsumer();
 
     void reject();
 
-    boolean isRejectedBy(Consumer consumer);
+    boolean isRejectedBy(C consumer);
 
     boolean getDeliveredToConsumer();
 
     boolean expired() throws AMQException;
 
-    boolean acquire(Consumer sub);
+    boolean acquire(C sub);
 
     int getMaximumDeliveryCount();
 
-    int routeToAlternate(Action<MessageInstance> action, ServerTransaction txn);
+    int routeToAlternate(Action<MessageInstance<C>> action, ServerTransaction txn);
 
     Filterable asFilterable();
 
@@ -161,11 +161,11 @@ public interface MessageInstance
         }
     }
 
-    public final class ConsumerAcquiredState extends EntryState
+    public final class ConsumerAcquiredState<C extends Consumer> extends EntryState
     {
-        private final Consumer _consumer;
+        private final C _consumer;
 
-        public ConsumerAcquiredState(Consumer consumer)
+        public ConsumerAcquiredState(C consumer)
         {
             _consumer = consumer;
         }
@@ -176,7 +176,7 @@ public interface MessageInstance
             return State.ACQUIRED;
         }
 
-        public Consumer getConsumer()
+        public C getConsumer()
         {
             return _consumer;
         }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Thu Feb  6 00:41:07 2014
@@ -32,13 +32,13 @@ import org.apache.qpid.server.store.Tran
 import java.util.Collection;
 import java.util.EnumSet;
 
-public interface MessageSource extends TransactionLogResource, MessageNode
+public interface MessageSource<C extends Consumer> extends TransactionLogResource, MessageNode
 {
-    Consumer addConsumer(ConsumerTarget target, FilterManager filters,
+    C addConsumer(ConsumerTarget target, FilterManager filters,
                          Class<? extends ServerMessage> messageClass,
                          String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
 
-    Collection<Consumer> getConsumers();
+    Collection<C> getConsumers();
 
     void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Thu Feb  6 00:41:07 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.model;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
@@ -144,11 +145,11 @@ public interface VirtualHost extends Con
 
     public static interface Transaction
     {
-        void dequeue(QueueEntry entry);
+        void dequeue(MessageInstance entry);
 
-        void copy(QueueEntry entry, Queue queue);
+        void copy(MessageInstance entry, Queue queue);
 
-        void move(QueueEntry entry, Queue queue);
+        void move(MessageInstance entry, Queue queue);
 
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Thu Feb  6 00:41:07 2014
@@ -44,6 +44,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -759,11 +760,11 @@ public final class VirtualHostAdapter ex
 
         op.withinTransaction(new Transaction()
         {
-            public void dequeue(final QueueEntry entry)
+            public void dequeue(final MessageInstance entry)
             {
                 if(entry.acquire())
                 {
-                    txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action()
+                    txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
                     {
                         public void postCommit()
                         {
@@ -777,7 +778,7 @@ public final class VirtualHostAdapter ex
                 }
             }
 
-            public void copy(QueueEntry entry, Queue queue)
+            public void copy(MessageInstance entry, Queue queue)
             {
                 final ServerMessage message = entry.getMessage();
                 final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -803,7 +804,7 @@ public final class VirtualHostAdapter ex
 
             }
 
-            public void move(final QueueEntry entry, Queue queue)
+            public void move(final MessageInstance entry, Queue queue)
             {
                 final ServerMessage message = entry.getMessage();
                 final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -830,7 +831,7 @@ public final class VirtualHostAdapter ex
                                         entry.release();
                                     }
                                 });
-                    txn.dequeue(entry.getQueue(), message,
+                    txn.dequeue(entry.getOwningResource(), message,
                                 new ServerTransaction.Action()
                                 {
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb  6 00:41:07 2014
@@ -36,7 +36,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker
+public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue<C>, MessageSource<C>, CapacityChecker
 {
 
     public interface NotificationListener
@@ -183,8 +183,6 @@ public interface AMQQueue extends Compar
 
     Set<NotificationCheck> getNotificationChecks();
 
-    void flushConsumer(final Consumer sub) throws AMQException;
-
     void deliverAsync();
 
     void stop();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Thu Feb  6 00:41:07 2014
@@ -22,15 +22,15 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
 
-public interface BaseQueue extends TransactionLogResource
+public interface BaseQueue<C extends Consumer> extends TransactionLogResource
 {
-    void enqueue(ServerMessage message) throws AMQException;
-    void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException;
+    void enqueue(ServerMessage message, Action<MessageInstance<C>> action) throws AMQException;
 
     boolean isDurable();
     boolean isDeleted();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Thu Feb  6 00:41:07 2014
@@ -43,7 +43,7 @@ public class ConflationQueueList extends
     private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
     private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
 
-    public ConflationQueueList(AMQQueue queue, String conflationKey)
+    public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey)
     {
         super(queue);
         _conflationKey = conflationKey;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Thu Feb  6 00:41:07 2014
@@ -241,17 +241,17 @@ public class DefinedGroupMessageGroupMan
         return groupVal;
     }
 
-    private class GroupStateChangeListener implements StateChangeListener<MessageInstance, QueueEntry.State>
+    private class GroupStateChangeListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
     {
         private final Group _group;
 
         public GroupStateChangeListener(final Group group,
-                                        final MessageInstance entry)
+                                        final MessageInstance<QueueConsumer> entry)
         {
             _group = group;
         }
 
-        public void stateChanged(final MessageInstance entry,
+        public void stateChanged(final MessageInstance<QueueConsumer> entry,
                                  final MessageInstance.State oldState,
                                  final MessageInstance.State newState)
         {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Thu Feb  6 00:41:07 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
 
 public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
 {
-    private final AMQQueue _queue;
+    private final AMQQueue<QueueConsumer> _queue;
     private final PriorityQueueEntrySubList[] _priorityLists;
     private final int _priorities;
     private final int _priorityOffset;
@@ -46,7 +46,7 @@ public class PriorityQueueList implement
         return _priorities;
     }
 
-    public AMQQueue getQueue()
+    public AMQQueue<QueueConsumer> getQueue()
     {
         return _queue;
     }
@@ -166,7 +166,7 @@ public class PriorityQueueList implement
     {
         private int _listPriority;
 
-        public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
+        public PriorityQueueEntrySubList(AMQQueue<QueueConsumer> queue, int listPriority)
         {
             super(queue);
             _listPriority = listPriority;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Thu Feb  6 00:41:07 2014
@@ -213,32 +213,27 @@ class QueueConsumer<T extends ConsumerTa
         }
     }
 
-    @Override
-    public void flushBatched()
+    void flushBatched()
     {
         _target.flushBatched();
     }
 
-    @Override
-    public void queueDeleted()
+    void queueDeleted()
     {
         _target.queueDeleted();
     }
 
-    @Override
-    public boolean wouldSuspend(final MessageInstance msg)
+    boolean wouldSuspend(final MessageInstance msg)
     {
         return !_target.allocateCredit(msg.getMessage());
     }
 
-    @Override
-    public void restoreCredit(final MessageInstance queueEntry)
+    void restoreCredit(final MessageInstance queueEntry)
     {
         _target.restoreCredit(queueEntry.getMessage());
     }
 
-    @Override
-    public void queueEmpty() throws AMQException
+    void queueEmpty() throws AMQException
     {
         _target.queueEmpty();
     }
@@ -298,8 +293,7 @@ class QueueConsumer<T extends ConsumerTa
         getQueue().flushConsumer(this);
     }
 
-    @Override
-    public boolean resend(final MessageInstance entry) throws AMQException
+    boolean resend(final MessageInstance entry) throws AMQException
     {
         return getQueue().resend((QueueEntry)entry, this);
     }
@@ -430,7 +424,7 @@ class QueueConsumer<T extends ConsumerTa
         return _createTime;
     }
 
-    public final MessageInstance.ConsumerAcquiredState getOwningState()
+    final MessageInstance.ConsumerAcquiredState getOwningState()
     {
         return _owningState;
     }
@@ -465,10 +459,15 @@ class QueueConsumer<T extends ConsumerTa
         return _deliveredCount.longValue();
     }
 
-    public final void send(final MessageInstance entry, final boolean batch) throws AMQException
+    final void send(final QueueEntry entry, final boolean batch) throws AMQException
     {
         _deliveredCount.incrementAndGet();
-        _deliveredBytes.addAndGet(entry.getMessage().getSize());
+        ServerMessage message = entry.getMessage();
+        if(message == null)
+        {
+            throw new AMQException("message was null!");
+        }
+        _deliveredBytes.addAndGet(message.getSize());
         _target.send(entry, batch);
     }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Feb  6 00:41:07 2014
@@ -22,10 +22,10 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.MessageInstance;
 
-public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
+public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry>
 {
 
-    AMQQueue getQueue();
+    AMQQueue<QueueConsumer> getQueue();
 
     long getSize();
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Feb  6 00:41:07 2014
@@ -63,7 +63,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile Set<StateChangeListener<MessageInstance, State>> _stateChangeListeners;
+    private volatile Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -138,7 +138,7 @@ public abstract class QueueEntryImpl imp
         return _entryId;
     }
 
-    public AMQQueue getQueue()
+    public AMQQueue<QueueConsumer> getQueue()
     {
         return _queueEntryList.getQueue();
     }
@@ -202,7 +202,7 @@ public abstract class QueueEntryImpl imp
         return acquired;
     }
 
-    public boolean acquire(Consumer sub)
+    public boolean acquire(QueueConsumer sub)
     {
         final boolean acquired = acquire(sub.getOwningState());
         if(acquired)
@@ -218,7 +218,7 @@ public abstract class QueueEntryImpl imp
         return (_state instanceof ConsumerAcquiredState);
     }
 
-    public boolean isAcquiredBy(Consumer consumer)
+    public boolean isAcquiredBy(QueueConsumer consumer)
     {
         EntryState state = _state;
         return state instanceof ConsumerAcquiredState
@@ -264,12 +264,12 @@ public abstract class QueueEntryImpl imp
         return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
     }
 
-    public Consumer getDeliveredConsumer()
+    public QueueConsumer getDeliveredConsumer()
     {
         EntryState state = _state;
         if (state instanceof ConsumerAcquiredState)
         {
-            return ((ConsumerAcquiredState) state).getConsumer();
+            return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
         }
         else
         {
@@ -279,7 +279,7 @@ public abstract class QueueEntryImpl imp
 
     public void reject()
     {
-        Consumer consumer = getDeliveredConsumer();
+        QueueConsumer consumer = getDeliveredConsumer();
 
         if (consumer != null)
         {
@@ -296,7 +296,7 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    public boolean isRejectedBy(Consumer consumer)
+    public boolean isRejectedBy(QueueConsumer consumer)
     {
 
         if (_rejectedBy != null) // We have consumers that rejected this message
@@ -333,7 +333,7 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener<MessageInstance, State> l : _stateChangeListeners)
+        for(StateChangeListener<MessageInstance<QueueConsumer>, State> l : _stateChangeListeners)
         {
             l.stateChanged(this, oldState, newState);
         }
@@ -364,7 +364,7 @@ public abstract class QueueEntryImpl imp
         dispose();
     }
 
-    public int routeToAlternate(final Action<MessageInstance> action, ServerTransaction txn)
+    public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, ServerTransaction txn)
     {
         final AMQQueue currentQueue = getQueue();
         Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -376,7 +376,10 @@ public abstract class QueueEntryImpl imp
                 txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
             }
 
-            int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
+            int enqueues = alternateExchange.send(getMessage(),
+                                                  getInstanceProperties(),
+                                                  txn,
+                                                  action);
 
             txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
             {
@@ -409,21 +412,21 @@ public abstract class QueueEntryImpl imp
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+    public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
     {
-        Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
         if(listeners == null)
         {
-            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance, State>>());
+            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance<QueueConsumer>, State>>());
             listeners = _stateChangeListeners;
         }
 
         listeners.add(listener);
     }
 
-    public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
     {
-        Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
         if(listeners != null)
         {
             return listeners.remove(listener);
@@ -491,7 +494,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public boolean resend() throws AMQException
     {
-        Consumer sub = getDeliveredConsumer();
+        QueueConsumer sub = getDeliveredConsumer();
         if(sub != null)
         {
             return sub.resend(this);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Thu Feb  6 00:41:07 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
 
 public interface QueueEntryList<Q extends QueueEntry>
 {
-    AMQQueue getQueue();
+    AMQQueue<QueueConsumer> getQueue();
 
     Q add(ServerMessage message);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Feb  6 00:41:07 2014
@@ -56,7 +56,7 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class SimpleAMQQueue implements AMQQueue,
+public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
                                        StateChangeListener<QueueConsumer, Consumer.State>,
                                        MessageGroupManager.ConsumerResetHelper
 {
@@ -525,9 +525,9 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public Collection<Consumer> getConsumers()
+    public Collection<QueueConsumer> getConsumers()
     {
-        List<Consumer> consumers = new ArrayList<Consumer>();
+        List<QueueConsumer> consumers = new ArrayList<QueueConsumer>();
         QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
         while(iter.advance())
         {
@@ -636,7 +636,7 @@ public class SimpleAMQQueue implements A
         enqueue(message, null);
     }
 
-    public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
+    public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
     {
         incrementQueueCount();
         incrementQueueSize(message);
@@ -1464,7 +1464,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void flushConsumer(Consumer sub) throws AMQException
+    void flushConsumer(QueueConsumer sub) throws AMQException
     {
         // Access control
         if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
@@ -1474,7 +1474,7 @@ public class SimpleAMQQueue implements A
         flushConsumer(sub, Long.MAX_VALUE);
     }
 
-    public boolean flushConsumer(Consumer sub, long iterations) throws AMQException
+    boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException
     {
         boolean atTail = false;
         final boolean keepSendLockHeld = iterations <=  SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1968,7 +1968,7 @@ public class SimpleAMQQueue implements A
         return _notificationChecks;
     }
 
-    private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
+    private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
     {
 
         private final QueueConsumer _sub;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Thu Feb  6 00:41:07 2014
@@ -39,7 +39,7 @@ public class SimpleQueueEntryList implem
         (SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail");
 
 
-    private final AMQQueue _queue;
+    private final AMQQueue<QueueConsumer> _queue;
 
     static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
                 _nextUpdater = SimpleQueueEntryImpl._nextUpdater;
@@ -49,7 +49,7 @@ public class SimpleQueueEntryList implem
     private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
 
 
-    public SimpleQueueEntryList(AMQQueue queue)
+    public SimpleQueueEntryList(AMQQueue<QueueConsumer> queue)
     {
         _queue = queue;
         _head = new SimpleQueueEntryImpl(this);
@@ -71,7 +71,7 @@ public class SimpleQueueEntryList implem
     }
 
 
-    public AMQQueue getQueue()
+    public AMQQueue<QueueConsumer> getQueue()
     {
         return _queue;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Thu Feb  6 00:41:07 2014
@@ -50,7 +50,7 @@ public class SortedQueue extends OutOfOr
         return _sortedPropertyName;
     }
 
-    public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
+    public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
     {
         synchronized (_sortedQueueLock)
         {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Thu Feb  6 00:41:07 2014
@@ -28,7 +28,7 @@ import org.apache.qpid.server.queue.Sort
  * Uses the red/black tree algorithm specified in "Introduction to Algorithms".
  * ISBN-10: 0262033844
  * ISBN-13: 978-0262033848
- * @see http://en.wikipedia.org/wiki/Red-black_tree
+ * see http://en.wikipedia.org/wiki/Red-black_tree
  */
 public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
 {
@@ -36,17 +36,17 @@ public class SortedQueueEntryList implem
     private SortedQueueEntryImpl _root;
     private long _entryId = Long.MIN_VALUE;
     private final Object _lock = new Object();
-    private final AMQQueue _queue;
+    private final AMQQueue<QueueConsumer> _queue;
     private final String _propertyName;
 
-    public SortedQueueEntryList(final AMQQueue queue, final String propertyName)
+    public SortedQueueEntryList(final AMQQueue<QueueConsumer> queue, final String propertyName)
     {
         _queue = queue;
         _head = new SortedQueueEntryImpl(this);
         _propertyName = propertyName;
     }
 
-    public AMQQueue getQueue()
+    public AMQQueue<QueueConsumer> getQueue()
     {
         return _queue;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Thu Feb  6 00:41:07 2014
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import java.util.Set;
 import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -46,7 +47,7 @@ public class DurableConfigurationStoreHe
                                                                                                   Queue.EXCLUSIVE,
                                                                                                   Queue.ALTERNATE_EXCHANGE));
 
-    public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
+    public static void updateQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue) throws AMQStoreException
     {
         Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
         attributesMap.put(Queue.NAME, queue.getName());
@@ -71,7 +72,7 @@ public class DurableConfigurationStoreHe
         store.update(queue.getId(), QUEUE, attributesMap);
     }
 
-    public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
+    public static void createQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue)
             throws AMQStoreException
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Thu Feb  6 00:41:07 2014
@@ -25,6 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -85,7 +86,7 @@ public class TopicExchangeTest extends Q
 
     public void testDirectMatch() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+        AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
@@ -108,7 +109,7 @@ public class TopicExchangeTest extends Q
 
     public void testStarMatch() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+        AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
         _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
 
 
@@ -139,7 +140,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashMatch() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+        AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
         _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
 
 
@@ -190,7 +191,7 @@ public class TopicExchangeTest extends Q
 
     public void testMidHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+        AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
 
@@ -216,7 +217,7 @@ public class TopicExchangeTest extends Q
 
     public void testMatchAfterHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+        AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
 
@@ -255,7 +256,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashAfterHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+        AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
 
@@ -277,7 +278,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+        AMQQueue<Consumer> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
 
@@ -321,7 +322,7 @@ public class TopicExchangeTest extends Q
         when(message.getMessageNumber()).thenReturn(messageNumber);
         for(BaseQueue q : queues)
         {
-            q.enqueue(message);
+            q.enqueue(message, null);
         }
 
         return queues.size();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Thu Feb  6 00:41:07 2014
@@ -51,19 +51,19 @@ public class AMQPriorityQueueTest extend
 
         // Enqueue messages in order
         SimpleAMQQueue queue = getQueue();
-        queue.enqueue(createMessage(1L, (byte) 10));
-        queue.enqueue(createMessage(2L, (byte) 4));
-        queue.enqueue(createMessage(3L, (byte) 0));
+        queue.enqueue(createMessage(1L, (byte) 10), null);
+        queue.enqueue(createMessage(2L, (byte) 4), null);
+        queue.enqueue(createMessage(3L, (byte) 0), null);
 
         // Enqueue messages in reverse order
-        queue.enqueue(createMessage(4L, (byte) 0));
-        queue.enqueue(createMessage(5L, (byte) 4));
-        queue.enqueue(createMessage(6L, (byte) 10));
+        queue.enqueue(createMessage(4L, (byte) 0), null);
+        queue.enqueue(createMessage(5L, (byte) 4), null);
+        queue.enqueue(createMessage(6L, (byte) 10), null);
 
         // Enqueue messages out of order
-        queue.enqueue(createMessage(7L, (byte) 4));
-        queue.enqueue(createMessage(8L, (byte) 10));
-        queue.enqueue(createMessage(9L, (byte) 0));
+        queue.enqueue(createMessage(7L, (byte) 4), null);
+        queue.enqueue(createMessage(8L, (byte) 10), null);
+        queue.enqueue(createMessage(9L, (byte) 0), null);
 
         // Register subscriber
         queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class));

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu Feb  6 00:41:07 2014
@@ -43,7 +43,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-public class MockAMQQueue implements AMQQueue
+public class MockAMQQueue implements AMQQueue<QueueConsumer>
 {
     private boolean _deleted = false;
     private String _name;
@@ -208,7 +208,7 @@ public class MockAMQQueue implements AMQ
     }
 
     @Override
-    public Consumer addConsumer(final ConsumerTarget target,
+    public QueueConsumer addConsumer(final ConsumerTarget target,
                                 final FilterManager filters,
                                 final Class<? extends ServerMessage> messageClass,
                                 final String consumerName,
@@ -226,7 +226,7 @@ public class MockAMQQueue implements AMQ
 
 
 
-    public Collection<Consumer> getConsumers()
+    public Collection<QueueConsumer> getConsumers()
     {
         return Collections.emptyList();
     }
@@ -306,7 +306,7 @@ public class MockAMQQueue implements AMQ
     {
     }
 
-    public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
+    public void enqueue(ServerMessage message, Action<MessageInstance<QueueConsumer>> action) throws AMQException
     {
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Thu Feb  6 00:41:07 2014
@@ -42,7 +42,7 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public boolean acquire(Consumer sub)
+    public boolean acquire(QueueConsumer sub)
     {
         return false;
     }
@@ -58,12 +58,12 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public boolean isAcquiredBy(Consumer consumer)
+    public boolean isAcquiredBy(QueueConsumer consumer)
     {
         return false;
     }
 
-    public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+    public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
     {
 
     }
@@ -73,7 +73,7 @@ public class MockQueueEntry implements Q
 
     }
 
-    public int routeToAlternate(final Action<MessageInstance> action, final ServerTransaction txn)
+    public int routeToAlternate(final Action<MessageInstance<QueueConsumer>> action, final ServerTransaction txn)
     {
         return 0;
     }
@@ -88,7 +88,7 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public Consumer getDeliveredConsumer()
+    public QueueConsumer getDeliveredConsumer()
     {
         return null;
     }
@@ -103,7 +103,7 @@ public class MockQueueEntry implements Q
         return _message;
     }
 
-    public AMQQueue getQueue()
+    public AMQQueue<QueueConsumer> getQueue()
     {
         return null;
     }
@@ -126,7 +126,7 @@ public class MockQueueEntry implements Q
     }
 
 
-    public boolean isRejectedBy(Consumer consumer)
+    public boolean isRejectedBy(QueueConsumer consumer)
     {
 
         return false;
@@ -153,7 +153,7 @@ public class MockQueueEntry implements Q
     }
 
 
-    public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
     {
 
         return false;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Thu Feb  6 00:41:07 2014
@@ -21,11 +21,11 @@ package org.apache.qpid.server.queue;
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 
 import java.lang.reflect.Field;
 
@@ -113,16 +113,16 @@ public abstract class QueueEntryImplTest
      */
     private void acquire()
     {
-        _queueEntry.acquire(newMockConsumer());
+        _queueEntry.acquire(newConsumer());
         assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
                 _queueEntry.isAcquired());
     }
 
-    private Consumer newMockConsumer()
+    private QueueConsumer newConsumer()
     {
-        final Consumer consumer = mock(Consumer.class);
-        when(consumer.getOwningState()).thenReturn(new MessageInstance.ConsumerAcquiredState(consumer));
-        when(consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+        final ConsumerTarget target = mock(ConsumerTarget.class);
+        when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
+        final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
         return consumer;
     }
 
@@ -153,7 +153,7 @@ public abstract class QueueEntryImplTest
      */
     public void testRejectAndRejectedBy()
     {
-        Consumer sub = newMockConsumer();
+        QueueConsumer sub = newConsumer();
 
         assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
         assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
@@ -167,7 +167,7 @@ public abstract class QueueEntryImplTest
         assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
 
         //repeat rejection using a second consumer
-        Consumer sub2 = newMockConsumer();
+        QueueConsumer sub2 = newConsumer();
 
         assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
         assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Thu Feb  6 00:41:07 2014
@@ -168,14 +168,15 @@ public class SimpleAMQQueueTest extends 
 
         // Check adding a consumer adds it to the queue
         _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                           EnumSet.noneOf(Consumer.Option.class));
+                                       EnumSet.of(Consumer.Option.ACQUIRES,
+                                                  Consumer.Option.SEES_REQUEUES));
         assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
         assertEquals("Queue does not have active consumer", 1,
                      _queue.getActiveConsumerCount());
 
         // Check sending a message ends up with the subscriber
-        _queue.enqueue(messageA);
+        _queue.enqueue(messageA, null);
         try
         {
             Thread.sleep(2000L);
@@ -194,7 +195,7 @@ public class SimpleAMQQueueTest extends 
                     1 == _queue.getActiveConsumerCount());
 
         ServerMessage messageB = createMessage(new Long (25));
-        _queue.enqueue(messageB);
+        _queue.enqueue(messageB, null);
          assertNull(_consumer.getQueueContext());
 
     }
@@ -202,9 +203,10 @@ public class SimpleAMQQueueTest extends 
     public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
     {
         ServerMessage messageA = createMessage(new Long(24));
-        _queue.enqueue(messageA);
+        _queue.enqueue(messageA, null);
         _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                           EnumSet.noneOf(Consumer.Option.class));
+                                       EnumSet.of(Consumer.Option.ACQUIRES,
+                                                  Consumer.Option.SEES_REQUEUES));
         Thread.sleep(150);
         assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after an enqueue",
@@ -218,10 +220,11 @@ public class SimpleAMQQueueTest extends 
     {
         ServerMessage messageA = createMessage(new Long(24));
         ServerMessage messageB = createMessage(new Long(25));
-        _queue.enqueue(messageA);
-        _queue.enqueue(messageB);
+        _queue.enqueue(messageA, null);
+        _queue.enqueue(messageB, null);
         _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                           EnumSet.noneOf(Consumer.Option.class));
+                                       EnumSet.of(Consumer.Option.ACQUIRES,
+                                                  Consumer.Option.SEES_REQUEUES));
         Thread.sleep(150);
         assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after enqueues",
@@ -245,13 +248,7 @@ public class SimpleAMQQueueTest extends 
                                                       Consumer.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
-        {
-            public void performAction(MessageInstance entry)
-            {
-                queueEntries.add((QueueEntry) entry);
-            }
-        };
+        EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
 
         /* Enqueue three messages */
 
@@ -298,13 +295,7 @@ public class SimpleAMQQueueTest extends 
                                                       Consumer.Option.ACQUIRES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
-        {
-            public void performAction(MessageInstance entry)
-            {
-                queueEntries.add((QueueEntry) entry);
-            }
-        };
+        EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
 
         /* Enqueue one message with expiration set for a short time in the future */
 
@@ -356,13 +347,7 @@ public class SimpleAMQQueueTest extends 
                                                       Consumer.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
-        {
-            public void performAction(MessageInstance entry)
-            {
-                queueEntries.add((QueueEntry) entry);
-            }
-        };
+        EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
 
         /* Enqueue three messages */
 
@@ -420,14 +405,7 @@ public class SimpleAMQQueueTest extends 
 
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
-        {
-            public void performAction(MessageInstance entry)
-            {
-                queueEntries.add((QueueEntry)entry);
-            }
-        };
-
+        EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
 
         /* Enqueue two messages */
 
@@ -460,7 +438,8 @@ public class SimpleAMQQueueTest extends 
         // Check adding an exclusive consumer adds it to the queue
 
         _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                           EnumSet.of(Consumer.Option.EXCLUSIVE));
+                                           EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES,
+                                                      Consumer.Option.SEES_REQUEUES));
 
         assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
@@ -468,7 +447,7 @@ public class SimpleAMQQueueTest extends 
                      _queue.getActiveConsumerCount());
 
         // Check sending a message ends up with the subscriber
-        _queue.enqueue(messageA);
+        _queue.enqueue(messageA, null);
         try
         {
             Thread.sleep(2000L);
@@ -485,7 +464,8 @@ public class SimpleAMQQueueTest extends 
         {
 
             _queue.addConsumer(subB, null, messageA.getClass(), "test",
-                               EnumSet.noneOf(Consumer.Option.class));
+                               EnumSet.of(Consumer.Option.ACQUIRES,
+                                          Consumer.Option.SEES_REQUEUES));
 
         }
         catch (AMQException e)
@@ -498,7 +478,8 @@ public class SimpleAMQQueueTest extends 
         // existing consumer
         _consumer.close();
         _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
-                                           EnumSet.noneOf(Consumer.Option.class));
+                                       EnumSet.of(Consumer.Option.ACQUIRES,
+                                                  Consumer.Option.SEES_REQUEUES));
 
         try
         {
@@ -522,9 +503,10 @@ public class SimpleAMQQueueTest extends 
 
         ServerMessage message = createMessage(new Long(25));
         _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
-                                           EnumSet.noneOf(Consumer.Option.class));
+                                       EnumSet.of(Consumer.Option.ACQUIRES,
+                                                  Consumer.Option.SEES_REQUEUES));
 
-       _queue.enqueue(message);
+       _queue.enqueue(message, null);
        _consumer.close();
        assertTrue("Queue was not deleted when consumer was removed",
                   _queue.isDeleted());
@@ -536,12 +518,27 @@ public class SimpleAMQQueueTest extends 
         ServerMessage message = createMessage(id);
 
         _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
-                                           EnumSet.noneOf(Consumer.Option.class));
+                                           EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+
+        _queue.enqueue(message, new Action<MessageInstance<QueueConsumer>>()
+        {
+            @Override
+            public void performAction(final MessageInstance<QueueConsumer> object)
+            {
+                QueueEntry entry = (QueueEntry) object;
+                entry.setRedelivered();
+                try
+                {
+                    _consumer.resend(entry);
+                }
+                catch (AMQException e)
+                {
+                    fail("Exception thrown: " + e.getMessage());
+                }
+            }
+        });
+
 
-        _queue.enqueue(message);
-        QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry();
-        entry.setRedelivered();
-        _consumer.resend(entry);
 
     }
 
@@ -552,7 +549,7 @@ public class SimpleAMQQueueTest extends 
         ServerMessage message = createMessage(messageId);
 
         // Put message on queue
-        _queue.enqueue(message);
+        _queue.enqueue(message, null);
         // Get message id
         Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
 
@@ -568,7 +565,7 @@ public class SimpleAMQQueueTest extends 
             Long messageId = new Long(i);
             ServerMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(message);
+            _queue.enqueue(message, null);
         }
         // Get message ids
         List<Long> msgids = _queue.getMessagesOnTheQueue(5);
@@ -589,7 +586,7 @@ public class SimpleAMQQueueTest extends 
             Long messageId = new Long(i);
             ServerMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(message);
+            _queue.enqueue(message, null);
         }
         // Get message ids
         List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
@@ -610,7 +607,7 @@ public class SimpleAMQQueueTest extends 
             Long messageId = new Long(i);
             ServerMessage message = createMessage(messageId);
             // Put message on queue
-            _queue.enqueue(message);
+            _queue.enqueue(message, null);
         }
 
         // Get non-existent 0th QueueEntry & check returned list was empty
@@ -953,7 +950,8 @@ public class SimpleAMQQueueTest extends 
                                   null,
                                   entries.get(0).getMessage().getClass(),
                                   "test",
-                                  EnumSet.noneOf(Consumer.Option.class));
+                                  EnumSet.of(Consumer.Option.ACQUIRES,
+                                             Consumer.Option.SEES_REQUEUES));
 
             // process queue
             testQueue.processQueue(new QueueRunner(testQueue)
@@ -1018,7 +1016,7 @@ public class SimpleAMQQueueTest extends 
                                     }
 
                                     @Override
-                                    public boolean acquire(Consumer sub)
+                                    public boolean acquire(QueueConsumer sub)
                                     {
                                         if(message.getMessageNumber() % 2 == 0)
                                         {
@@ -1044,7 +1042,8 @@ public class SimpleAMQQueueTest extends 
                               null,
                               createMessage(-1l).getClass(),
                               "test",
-                              EnumSet.noneOf(Consumer.Option.class));
+                              EnumSet.of(Consumer.Option.ACQUIRES,
+                                         Consumer.Option.SEES_REQUEUES));
         }
         catch (AMQException e)
         {
@@ -1077,7 +1076,8 @@ public class SimpleAMQQueueTest extends 
                           null,
                           createMessage(-1l).getClass(),
                           "test",
-                          EnumSet.noneOf(Consumer.Option.class));
+                          EnumSet.of(Consumer.Option.ACQUIRES,
+                                     Consumer.Option.SEES_REQUEUES));
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify adding an inactive consumer doesn't increase the count
@@ -1089,7 +1089,8 @@ public class SimpleAMQQueueTest extends 
                           null,
                           createMessage(-1l).getClass(),
                           "test",
-                          EnumSet.noneOf(Consumer.Option.class));
+                          EnumSet.of(Consumer.Option.ACQUIRES,
+                                     Consumer.Option.SEES_REQUEUES));
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify behaviour in face of expected state changes:
@@ -1133,10 +1134,10 @@ public class SimpleAMQQueueTest extends 
         _queue.setNotificationListener(listener);
         _queue.setMaximumMessageCount(2);
 
-        _queue.enqueue(createMessage(new Long(24)));
+        _queue.enqueue(createMessage(new Long(24)), null);
         verifyZeroInteractions(listener);
 
-        _queue.enqueue(createMessage(new Long(25)));
+        _queue.enqueue(createMessage(new Long(25)), null);
 
         verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
     }
@@ -1145,9 +1146,9 @@ public class SimpleAMQQueueTest extends 
     {
         AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
 
-        _queue.enqueue(createMessage(new Long(24)));
-        _queue.enqueue(createMessage(new Long(25)));
-        _queue.enqueue(createMessage(new Long(26)));
+        _queue.enqueue(createMessage(new Long(24)), null);
+        _queue.enqueue(createMessage(new Long(25)), null);
+        _queue.enqueue(createMessage(new Long(26)), null);
 
         _queue.setNotificationListener(listener);
         _queue.setMaximumMessageCount(2);
@@ -1309,6 +1310,21 @@ public class SimpleAMQQueueTest extends 
         return message;
     }
 
+    private static class EntryListAddingAction implements Action<MessageInstance<QueueConsumer>>
+    {
+        private final ArrayList<QueueEntry> _queueEntries;
+
+        public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries)
+        {
+            _queueEntries = queueEntries;
+        }
+
+        public void performAction(MessageInstance entry)
+        {
+            _queueEntries.add((QueueEntry) entry);
+        }
+    }
+
     class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
     {
         QueueEntryList _list;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Feb  6 00:41:07 2014
@@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -105,18 +106,7 @@ public class ServerSession extends Sessi
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private ChannelLogSubject _logSubject;
     private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
-    private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>()
-    {
-        @Override
-        public void performAction(final MessageInstance entry)
-        {
-            TransactionLogResource queue = entry.getOwningResource();
-            if(queue instanceof CapacityChecker)
-            {
-                ((CapacityChecker)queue).checkCapacity(ServerSession.this);
-            }
-        }
-    };
+    private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
 
     public static interface MessageDispositionChangeListener
     {
@@ -938,4 +928,16 @@ public class ServerSession extends Sessi
         return getId().compareTo(o.getId());
     }
 
+    private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+    {
+        @Override
+        public void performAction(final MessageInstance<C> entry)
+        {
+            TransactionLogResource queue = entry.getOwningResource();
+            if(queue instanceof CapacityChecker)
+            {
+                ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+            }
+        }
+    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Feb  6 00:41:07 2014
@@ -1192,14 +1192,14 @@ public class AMQChannel implements AMQSe
     }
 
 
-    private class ImmediateAction implements Action<MessageInstance>
+    private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
     {
 
         public ImmediateAction()
         {
         }
 
-        public void performAction(MessageInstance entry)
+        public void performAction(MessageInstance<C> entry)
         {
             TransactionLogResource queue = entry.getOwningResource();
 
@@ -1258,10 +1258,10 @@ public class AMQChannel implements AMQSe
         }
     }
 
-    private final class CapacityCheckAction implements Action<MessageInstance>
+    private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
     {
         @Override
-        public void performAction(final MessageInstance entry)
+        public void performAction(final MessageInstance<C> entry)
         {
             TransactionLogResource queue = entry.getOwningResource();
             if(queue instanceof CapacityChecker)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1565024&r1=1565023&r2=1565024&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Thu Feb  6 00:41:07 2014
@@ -62,7 +62,7 @@ public class ExtractResendAndRequeueTest
     private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
     private static final int INITIAL_MSG_COUNT = 10;
     private AMQQueue _queue;
-    private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
+    private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>();
     private Consumer _consumer;
     private boolean _queueDeleted;
 
@@ -115,11 +115,11 @@ public class ExtractResendAndRequeueTest
      *
      * @return Subscription that performed the acquire
      */
-    private void acquireMessages(LinkedList<QueueEntry> messageList)
+    private void acquireMessages(LinkedList<MessageInstance> messageList)
     {
 
         // Acquire messages in subscription
-        for(QueueEntry entry : messageList)
+        for(MessageInstance entry : messageList)
         {
             when(entry.getDeliveredConsumer()).thenReturn(_consumer);
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org