You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/03 03:15:01 UTC

svn commit: r1563758 [1/3] - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/model/adapter/ broker-core/src/main/java/org/apache/qpid/se...

Author: rgodfrey
Date: Mon Feb  3 02:14:59 2014
New Revision: 1563758

URL: http://svn.apache.org/r1563758
Log:
Updates to subscription

Added:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
      - copied, changed from r1563433, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
      - copied, changed from r1563431, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
Removed:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
Modified:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
    qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Mon Feb  3 02:14:59 2014
@@ -39,8 +39,10 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -66,7 +68,7 @@ public abstract class AbstractExchange i
 
     private VirtualHost _virtualHost;
 
-    private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
+    private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
 
     /**
      * Whether the exchange is automatically deleted once all queues have detached from it
@@ -138,6 +140,12 @@ public abstract class AbstractExchange i
 
         if(_closed.compareAndSet(false,true))
         {
+            List<Binding> bindings = new ArrayList<Binding>(_bindings);
+            for(Binding binding : bindings)
+            {
+                removeBinding(binding);
+            }
+
             if(_alternateExchange != null)
             {
                 _alternateExchange.removeReference(this);
@@ -145,9 +153,9 @@ public abstract class AbstractExchange i
 
             CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
 
-            for(Task task : _closeTaskList)
+            for(Action<Exchange> task : _closeTaskList)
             {
-                task.onClose(this);
+                task.performAction(this);
             }
             _closeTaskList.clear();
         }
@@ -300,12 +308,12 @@ public abstract class AbstractExchange i
         return !_referrers.isEmpty();
     }
 
-    public void addCloseTask(final Task task)
+    public void addCloseTask(final Action<Exchange> task)
     {
         _closeTaskList.add(task);
     }
 
-    public void removeCloseTask(final Task task)
+    public void removeCloseTask(final Action<Exchange> task)
     {
         _closeTaskList.remove(task);
     }
@@ -421,7 +429,7 @@ public abstract class AbstractExchange i
     public final int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final BaseQueue.PostEnqueueAction postEnqueueAction)
+                          final Action<QueueEntry> postEnqueueAction)
     {
         List<? extends BaseQueue> queues = route(message, instanceProperties);
 
@@ -579,8 +587,6 @@ public abstract class AbstractExchange i
         {
             doRemoveBinding(b);
             queue.removeBinding(b);
-            removeCloseTask(b);
-            queue.removeQueueDeleteTask(b);
 
             if (b.isDurable())
             {
@@ -659,8 +665,6 @@ public abstract class AbstractExchange i
                 DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
             }
 
-            queue.addQueueDeleteTask(b);
-            addCloseTask(b);
             queue.addBinding(b);
             doAddBinding(b);
             b.logCreation();
@@ -673,7 +677,7 @@ public abstract class AbstractExchange i
         }
     }
 
-    private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+    private final class BindingImpl extends Binding
     {
         private final BindingLogSubject _logSubject;
         //TODO : persist creation time
@@ -689,12 +693,6 @@ public abstract class AbstractExchange i
 
         }
 
-
-        public void doTask(final AMQQueue queue) throws AMQException
-        {
-            removeBinding(this);
-        }
-
         public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
         {
             removeBinding(this);
@@ -729,11 +727,4 @@ public abstract class AbstractExchange i
 
     }
 
-    public static interface Task
-    {
-        public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
-    }
-
-
-
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Mon Feb  3 02:14:59 2014
@@ -42,8 +42,10 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class DefaultExchange implements Exchange
@@ -334,7 +336,7 @@ public class DefaultExchange implements 
     public final int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final BaseQueue.PostEnqueueAction postEnqueueAction)
+                          final Action<QueueEntry> postEnqueueAction)
     {
         final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
         if(q == null)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Mon Feb  3 02:14:59 2014
@@ -29,7 +29,9 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -105,7 +107,7 @@ public interface Exchange extends Exchan
     int send(ServerMessage message,
              InstanceProperties instanceProperties,
              ServerTransaction txn,
-             BaseQueue.PostEnqueueAction postEnqueueAction);
+             Action<QueueEntry> postEnqueueAction);
 
     /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Mon Feb  3 02:14:59 2014
@@ -47,7 +47,7 @@ public class ConsumerAdapter extends Abs
                                                queueAdapter.getName(),
                                                subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
                                                String.valueOf(subscription.getSessionModel().getChannelId()),
-                                               subscription.getConsumerName()), queueAdapter.getTaskExecutor());
+                                               subscription.getName()), queueAdapter.getTaskExecutor());
         _subscription = subscription;
         _queue = queueAdapter;
         _session = sessionAdapter;
@@ -57,7 +57,7 @@ public class ConsumerAdapter extends Abs
 
     public String getName()
     {
-        return _subscription.getConsumerName();
+        return _subscription.getName();
     }
 
     public String setName(final String currentName, final String desiredName)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Feb  3 02:14:59 2014
@@ -30,6 +30,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -132,8 +133,8 @@ public interface AMQQueue extends Compar
 
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
-    void addQueueDeleteTask(final Task task);
-    void removeQueueDeleteTask(final Task task);
+    void addQueueDeleteTask(Action<AMQQueue> task);
+    void removeQueueDeleteTask(Action<AMQQueue> task);
 
 
 
@@ -271,11 +272,6 @@ public interface AMQQueue extends Compar
         }
     }
 
-    static interface Task
-    {
-        public void doTask(AMQQueue queue) throws AMQException;
-    }
-
     void configure(QueueConfiguration config);
 
     void setExclusive(boolean exclusive);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Mon Feb  3 02:14:59 2014
@@ -24,17 +24,12 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
 
 public interface BaseQueue extends TransactionLogResource
 {
-    public static interface PostEnqueueAction
-    {
-        public void onEnqueue(QueueEntry entry);
-    }
-
     void enqueue(ServerMessage message) throws AMQException;
-    void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
-    void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
+    void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException;
 
     boolean isDurable();
     boolean isDeleted();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Feb  3 02:14:59 2014
@@ -25,6 +25,8 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
 
 public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
 {
@@ -42,11 +44,6 @@ public interface QueueEntry extends Mess
 
     }
 
-    public static interface StateChangeListener
-    {
-        public void stateChanged(QueueEntry entry, State oldSate, State newState);
-    }
-
     public abstract class EntryState
     {
         private EntryState()
@@ -198,7 +195,7 @@ public interface QueueEntry extends Mess
 
     boolean isRejectedBy(long subscriptionId);
 
-    int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
+    int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn);
 
     boolean isQueueDeleted();
 
@@ -206,8 +203,8 @@ public interface QueueEntry extends Mess
 
     QueueEntry getNextValidEntry();
 
-    void addStateChangeListener(StateChangeListener listener);
-    boolean removeStateChangeListener(StateChangeListener listener);
+    void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+    boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
 
 
     /**

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Feb  3 02:14:59 2014
@@ -31,6 +31,8 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
 
 import java.util.EnumMap;
 import java.util.HashSet;
@@ -59,7 +61,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile Set<StateChangeListener> _stateChangeListeners;
+    private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -231,11 +233,6 @@ public abstract class QueueEntryImpl imp
             if(state instanceof SubscriptionAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount(this);
-                Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
-                if (subscription != null)
-                {
-                    subscription.releaseQueueEntry(this);
-                }
             }
 
             if(!getQueue().isDeleted())
@@ -320,8 +317,6 @@ public abstract class QueueEntryImpl imp
             if (state instanceof SubscriptionAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount(this);
-                s = ((SubscriptionAcquiredState) state).getSubscription();
-                s.onDequeue(this);
             }
 
             getQueue().dequeue(this,s);
@@ -336,7 +331,7 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener l : _stateChangeListeners)
+        for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners)
         {
             l.stateChanged(this, oldState, newState);
         }
@@ -367,7 +362,7 @@ public abstract class QueueEntryImpl imp
         dispose();
     }
 
-    public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
+    public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn)
     {
         final AMQQueue currentQueue = getQueue();
         Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -412,21 +407,21 @@ public abstract class QueueEntryImpl imp
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener listener)
+    public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
     {
-        Set<StateChangeListener> listeners = _stateChangeListeners;
+        Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
         if(listeners == null)
         {
-            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>());
             listeners = _stateChangeListeners;
         }
 
         listeners.add(listener);
     }
 
-    public boolean removeStateChangeListener(StateChangeListener listener)
+    public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
     {
-        Set<StateChangeListener> listeners = _stateChangeListeners;
+        Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
         if(listeners != null)
         {
             return listeners.remove(listener);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Feb  3 02:14:59 2014
@@ -61,9 +61,13 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+public class SimpleAMQQueue implements AMQQueue,
+                                       StateChangeListener<Subscription, Subscription.State>,
+                                       MessageGroupManager.SubscriptionResetHelper
 {
 
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -121,10 +125,6 @@ public class SimpleAMQQueue implements A
     private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
     private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
     private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
-    private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
-    private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
-    private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
-    private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
     private final AtomicLong _unackedMsgCount = new AtomicLong(0);
     private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
     private final AtomicLong _unackedMsgBytes = new AtomicLong();
@@ -165,7 +165,7 @@ public class SimpleAMQQueue implements A
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
-    private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+    private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
 
 
     private LogSubject _logSubject;
@@ -451,7 +451,7 @@ public class SimpleAMQQueue implements A
 
             if (isDeleted())
             {
-                subscription.queueDeleted(this);
+                subscription.queueDeleted();
             }
         }
         else
@@ -505,7 +505,7 @@ public class SimpleAMQQueue implements A
 
                 // we need to manually fire the event to the removed subscription (which was the last one left for this
                 // queue. This is because the delete method uses the subscription set which has just been cleared
-                subscription.queueDeleted(this);
+                subscription.queueDeleted();
             }
         }
 
@@ -622,18 +622,8 @@ public class SimpleAMQQueue implements A
         enqueue(message, null);
     }
 
-    public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+    public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
     {
-        enqueue(message, false, action);
-    }
-
-    public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
-    {
-
-        if(transactional)
-        {
-            incrementTxnEnqueueStats(message);
-        }
         incrementQueueCount();
         incrementQueueSize(message);
 
@@ -715,7 +705,7 @@ public class SimpleAMQQueue implements A
 
         if(action != null)
         {
-            action.onEnqueue(entry);
+            action.performAction(entry);
         }
 
     }
@@ -810,18 +800,6 @@ public class SimpleAMQQueue implements A
         getAtomicQueueCount().incrementAndGet();
     }
 
-    private void incrementTxnEnqueueStats(final ServerMessage message)
-    {
-        _msgTxnEnqueues.incrementAndGet();
-        _byteTxnEnqueues.addAndGet(message.getSize());
-    }
-
-    private void incrementTxnDequeueStats(QueueEntry entry)
-    {
-        _msgTxnDequeues.incrementAndGet();
-        _byteTxnDequeues.addAndGet(entry.getSize());
-    }
-
     private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
             throws AMQException
     {
@@ -900,11 +878,6 @@ public class SimpleAMQQueue implements A
             _deliveredMessages.decrementAndGet();
         }
 
-        if(sub != null && sub.isSessionTransactional())
-        {
-            incrementTxnDequeueStats(entry);
-        }
-
         checkCapacity();
 
     }
@@ -1039,7 +1012,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+    public void stateChanged(Subscription sub, Subscription.State oldState, Subscription.State newState)
     {
         if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
         {
@@ -1300,12 +1273,12 @@ public class SimpleAMQQueue implements A
                     });
     }
 
-    public void addQueueDeleteTask(final Task task)
+    public void addQueueDeleteTask(final Action<AMQQueue> task)
     {
         _deleteTaskList.add(task);
     }
 
-    public void removeQueueDeleteTask(final Task task)
+    public void removeQueueDeleteTask(final Action<AMQQueue> task)
     {
         _deleteTaskList.remove(task);
     }
@@ -1322,7 +1295,9 @@ public class SimpleAMQQueue implements A
         if (!_deleted.getAndSet(true))
         {
 
-            for (Binding b : _bindings)
+            final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+            for (Binding b : bindingCopy)
             {
                 b.getExchange().removeBinding(b);
             }
@@ -1334,7 +1309,7 @@ public class SimpleAMQQueue implements A
                 Subscription s = subscriptionIter.getNode().getSubscription();
                 if (s != null)
                 {
-                    s.queueDeleted(this);
+                    s.queueDeleted();
                 }
             }
 
@@ -1375,9 +1350,9 @@ public class SimpleAMQQueue implements A
             }
 
 
-            for (Task task : _deleteTaskList)
+            for (Action<AMQQueue> task : _deleteTaskList)
             {
-                task.doTask(this);
+                task.performAction(this);
             }
 
             _deleteTaskList.clear();
@@ -1984,7 +1959,7 @@ public class SimpleAMQQueue implements A
         return _notificationChecks;
     }
 
-    private final class QueueEntryListener implements QueueEntry.StateChangeListener
+    private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
     {
 
         private final Subscription _sub;
@@ -2076,26 +2051,6 @@ public class SimpleAMQQueue implements A
         return _dequeueSize.get();
     }
 
-    public long getByteTxnEnqueues()
-    {
-        return _byteTxnEnqueues.get();
-    }
-
-    public long getByteTxnDequeues()
-    {
-        return _byteTxnDequeues.get();
-    }
-
-    public long getMsgTxnEnqueues()
-    {
-        return _msgTxnEnqueues.get();
-    }
-
-    public long getMsgTxnDequeues()
-    {
-        return _msgTxnDequeues.get();
-    }
-
     public long getPersistentByteEnqueues()
     {
         return _persistentMessageEnqueueSize.get();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Mon Feb  3 02:14:59 2014
@@ -21,6 +21,7 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
@@ -48,7 +49,7 @@ public class SortedQueue extends OutOfOr
         return _sortedPropertyName;
     }
 
-    public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+    public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
     {
         synchronized (_sortedQueueLock)
         {

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java Mon Feb  3 02:14:59 2014
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.SubscriptionActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class AbstractSubscription implements Subscription
+{
+    private final long _subscriptionID;
+    private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
+    private final Lock _stateChangeLock = new ReentrantLock();
+    private final long _createTime = System.currentTimeMillis();
+    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+    private final boolean _acquires;
+    private final boolean _seesRequeues;
+    private final String _consumerName;
+    private final boolean _isTransient;
+
+
+    private final AtomicLong _deliveredCount = new AtomicLong(0);
+    private final AtomicLong _deliveredBytes = new AtomicLong(0);
+
+    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
+
+    private final FilterManager _filters;
+
+    private volatile AMQQueue.Context _queueContext;
+
+
+    private StateChangeListener<Subscription, State> _stateListener = new StateChangeListener<Subscription, State>()
+    {
+        public void stateChanged(Subscription sub, State oldState, State newState)
+        {
+            CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+        }
+    };
+
+    private SubscriptionLogSubject _logSubject;
+    private AMQQueue _queue;
+    private String _traceExclude;
+    private String _trace;
+    private SubscriptionActor _logActor;
+    private final Class<? extends ServerMessage> _messageClass;
+    private final Object _sessionReference;
+    private boolean _noLocal;
+
+    protected AbstractSubscription(FilterManager filters,
+                                   final Class<? extends ServerMessage> messageClass,
+                                   final Object sessionReference,
+                                   final boolean acquires,
+                                   final boolean seesRequeues,
+                                   final String consumerName, final boolean isTransient)
+    {
+        _messageClass = messageClass;
+        _sessionReference = sessionReference;
+        _subscriptionID = SUB_ID_GENERATOR.getAndIncrement();
+        _filters = filters;
+        _acquires = acquires;
+        _seesRequeues = seesRequeues;
+        _consumerName = consumerName;
+        _isTransient = isTransient;
+    }
+
+    public final long getSubscriptionID()
+    {
+        return _subscriptionID;
+    }
+
+
+    public final StateChangeListener<Subscription, State> getStateListener()
+    {
+        return _stateListener;
+    }
+
+    public final void setStateListener(StateChangeListener<Subscription, State> listener)
+    {
+        _stateListener = listener;
+    }
+
+
+    public final AMQQueue.Context getQueueContext()
+    {
+        return _queueContext;
+    }
+
+    public final void setQueueContext(AMQQueue.Context queueContext)
+    {
+        _queueContext = queueContext;
+    }
+
+
+    public State getState()
+    {
+        return _state.get();
+    }
+
+    protected boolean updateState(State from, State to)
+    {
+        return _state.compareAndSet(from, to);
+    }
+
+    public final boolean isActive()
+    {
+        return getState() == State.ACTIVE;
+    }
+
+    public final boolean isClosed()
+    {
+        return getState() == State.CLOSED;
+    }
+
+
+    public final void setNoLocal(boolean noLocal)
+    {
+        _noLocal = noLocal;
+    }
+
+
+    public final boolean hasInterest(QueueEntry entry)
+    {
+       //check that the message hasn't been rejected
+        if (entry.isRejectedBy(getSubscriptionID()))
+        {
+
+            return false;
+        }
+
+        if (entry.getMessage().getClass() == _messageClass)
+        {
+            if(_noLocal)
+            {
+                Object connectionRef = entry.getMessage().getConnectionReference();
+                if (connectionRef != null && connectionRef == _sessionReference)
+                {
+                    return false;
+                }
+            }
+        }
+        else
+        {
+            // no interest in messages we can't convert
+            if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null)
+            {
+                return false;
+            }
+        }
+        return (_filters == null) || _filters.allAllow(entry.asFilterable());
+    }
+
+
+    protected String getFilterLogString()
+    {
+        StringBuilder filterLogString = new StringBuilder();
+        String delimiter = ", ";
+        boolean hasEntries = false;
+        if (_filters != null && _filters.hasFilters())
+        {
+            filterLogString.append(_filters.toString());
+            hasEntries = true;
+        }
+
+        if (!acquires())
+        {
+            if (hasEntries)
+            {
+                filterLogString.append(delimiter);
+            }
+            filterLogString.append("Browser");
+            hasEntries = true;
+        }
+
+        return filterLogString.toString();
+    }
+
+
+    public final boolean trySendLock()
+    {
+        return _stateChangeLock.tryLock();
+    }
+
+    public final void getSendLock()
+    {
+        _stateChangeLock.lock();
+    }
+
+    public final void releaseSendLock()
+    {
+        _stateChangeLock.unlock();
+    }
+
+    public final AMQQueue getQueue()
+    {
+        return _queue;
+    }
+
+    public final void setQueue(AMQQueue queue, boolean exclusive)
+    {
+        if(getQueue() != null)
+        {
+            throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
+        }
+        _queue = queue;
+
+        _traceExclude = (String) queue.getAttribute(Queue.FEDERATION_EXCLUDES);
+        _trace = (String) queue.getAttribute(Queue.FEDERATION_ID);
+
+        _logSubject = new SubscriptionLogSubject(this);
+        _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
+
+        if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logSubject, SubscriptionMessages.CREATE_LOG_HIERARCHY))
+        {
+            final String filterLogString = getFilterLogString();
+            CurrentActor.get().message(_logSubject, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
+                                                                         filterLogString.length() > 0));
+        }
+    }
+
+    protected final String getTraceExclude()
+    {
+        return _traceExclude;
+    }
+
+    protected final String getTrace()
+    {
+        return _trace;
+    }
+
+    protected final LogSubject getLogSubject()
+    {
+        return _logSubject;
+    }
+
+    public final LogActor getLogActor()
+    {
+        return _logActor;
+    }
+
+    public final long getCreateTime()
+    {
+        return _createTime;
+    }
+
+
+    public final QueueEntry.SubscriptionAcquiredState getOwningState()
+    {
+        return _owningState;
+    }
+
+
+    public final void set(String key, Object value)
+    {
+        _properties.put(key, value);
+    }
+
+    public final Object get(String key)
+    {
+        return _properties.get(key);
+    }
+
+    public final boolean acquires()
+    {
+        return _acquires;
+    }
+
+    public final boolean seesRequeues()
+    {
+        return _seesRequeues;
+    }
+
+    public final String getName()
+    {
+        return _consumerName;
+    }
+
+    public final boolean isTransient()
+    {
+        return _isTransient;
+    }
+
+
+    public final long getBytesOut()
+    {
+        return _deliveredBytes.longValue();
+    }
+
+    public final long getMessagesOut()
+    {
+        return _deliveredCount.longValue();
+    }
+
+    public final void send(final QueueEntry entry, final boolean batch) throws AMQException
+    {
+        _deliveredCount.incrementAndGet();
+        _deliveredBytes.addAndGet(entry.getMessage().getSize());
+        doSend(entry, batch);
+    }
+
+    protected abstract void doSend(final QueueEntry entry, final boolean batch) throws AMQException;
+
+    @Override
+    public final void flush() throws AMQException
+    {
+        getQueue().flushSubscription(this);
+    }
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscriptionTarget.java Mon Feb  3 02:14:59 2014
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractSubscriptionTarget implements SubscriptionTarget
+{
+
+    private final AtomicReference<State> _state;
+    private final AtomicReference<StateChangeListener<SubscriptionTarget, State>> _stateListener =
+            new AtomicReference<StateChangeListener<SubscriptionTarget, State>>();
+
+    protected AbstractSubscriptionTarget(final State initialState)
+    {
+        _state = new AtomicReference<State>(initialState);
+    }
+
+
+    public final State getState()
+    {
+        return _state.get();
+    }
+
+    protected final boolean updateState(State from, State to)
+    {
+        if(_state.compareAndSet(from, to))
+        {
+            StateChangeListener<SubscriptionTarget, State> listener = _stateListener.get();
+            if(listener != null)
+            {
+                listener.stateChanged(this, from, to);
+            }
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+
+    public final void setStateListener(StateChangeListener<SubscriptionTarget, State> listener)
+    {
+        _stateListener.set(listener);
+    }
+
+    public final StateChangeListener<SubscriptionTarget, State> getStateListener()
+    {
+        return _stateListener.get();
+    }
+
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Mon Feb  3 02:14:59 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.subscription;
 
 import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.util.StateChangeListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,7 +241,7 @@ public class DefinedGroupMessageGroupMan
         return groupVal;
     }
 
-    private class GroupStateChangeListener implements QueueEntry.StateChangeListener
+    private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State>
     {
         private final Group _group;
 

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java Mon Feb  3 02:14:59 2014
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.SubscriptionMessages;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+import java.util.EnumMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class DelegatingSubscription<T extends SubscriptionTarget> extends AbstractSubscription
+{
+    private static final Logger _logger = Logger.getLogger(DelegatingSubscription.class);
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+    static final EnumMap<SubscriptionTarget.State, State> STATE_MAP =
+            new EnumMap<SubscriptionTarget.State, State>(SubscriptionTarget.State.class);
+
+    static
+    {
+        STATE_MAP.put(SubscriptionTarget.State.ACTIVE, State.ACTIVE);
+        STATE_MAP.put(SubscriptionTarget.State.SUSPENDED, State.SUSPENDED);
+        STATE_MAP.put(SubscriptionTarget.State.CLOSED, State.CLOSED);
+    }
+
+    private final T _target;
+
+    public DelegatingSubscription(final FilterManager filters,
+                                  final Class<? extends ServerMessage> messageClass,
+                                  final boolean acquires,
+                                  final boolean seesRequeues,
+                                  final String consumerName,
+                                  final boolean isTransient,
+                                  T target)
+    {
+        super(filters, messageClass, target.getSessionModel().getConnectionReference(),
+              acquires, seesRequeues, consumerName, isTransient);
+        _target = target;
+        _target.setStateListener(
+                new StateChangeListener<SubscriptionTarget, SubscriptionTarget.State>()
+                    {
+                        @Override
+                        public void stateChanged(final SubscriptionTarget object,
+                                                 final SubscriptionTarget.State oldState,
+                                                 final SubscriptionTarget.State newState)
+                        {
+                            targetStateChanged(oldState, newState);
+                        }
+                    });
+    }
+
+    private void targetStateChanged(final SubscriptionTarget.State oldState, final SubscriptionTarget.State newState)
+    {
+        if(oldState != newState)
+        {
+            if(newState == SubscriptionTarget.State.CLOSED)
+            {
+                if(_closed.compareAndSet(false,true))
+                {
+                    CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
+                }
+            }
+            else
+            {
+                CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString()));
+            }
+        }
+
+        if(newState == SubscriptionTarget.State.CLOSED && oldState != newState)
+        {
+            try
+            {
+                getQueue().unregisterSubscription(this);
+            }
+            catch (AMQException e)
+            {
+                _logger.error("Unable to remove to remove subscription", e);
+                throw new RuntimeException(e);
+            }
+        }
+        final StateChangeListener<Subscription, State> stateListener = getStateListener();
+        if(stateListener != null)
+        {
+            stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
+        }
+    }
+
+    public T getTarget()
+    {
+        return _target;
+    }
+
+    @Override
+    public long getUnacknowledgedBytes()
+    {
+        return _target.getUnacknowledgedBytes();
+    }
+
+    @Override
+    public long getUnacknowledgedMessages()
+    {
+        return _target.getUnacknowledgedMessages();
+    }
+
+    @Override
+    public AMQSessionModel getSessionModel()
+    {
+        return _target.getSessionModel();
+    }
+
+    @Override
+    public boolean isSuspended()
+    {
+        return _target.isSuspended();
+    }
+
+    @Override
+    public void close()
+    {
+        _target.close();
+    }
+
+    @Override
+    protected void doSend(final QueueEntry entry, final boolean batch) throws AMQException
+    {
+        _target.send(entry, batch);
+    }
+
+    @Override
+    public void flushBatched()
+    {
+        _target.flushBatched();
+    }
+
+    @Override
+    public void queueDeleted()
+    {
+        _target.queueDeleted();
+    }
+
+    @Override
+    public boolean wouldSuspend(final QueueEntry msg)
+    {
+        return !_target.allocateCredit(msg);
+    }
+
+    @Override
+    public void restoreCredit(final QueueEntry queueEntry)
+    {
+        _target.restoreCredit(queueEntry);
+    }
+
+    @Override
+    public void queueEmpty() throws AMQException
+    {
+        _target.queueEmpty();
+    }
+
+    @Override
+    public State getState()
+    {
+        return STATE_MAP.get(_target.getState());
+    }
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java Mon Feb  3 02:14:59 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
 
 public interface Subscription
 {
@@ -50,11 +51,6 @@ public interface Subscription
         CLOSED
     }
 
-    public static interface StateListener
-    {
-        public void stateChange(Subscription sub, State oldState, State newState);
-    }
-
     AMQQueue getQueue();
     AMQSessionModel getSessionModel();
 
@@ -82,7 +78,7 @@ public interface Subscription
 
     void flushBatched();
 
-    void queueDeleted(AMQQueue queue);
+    void queueDeleted();
 
 
     boolean wouldSuspend(QueueEntry msg);
@@ -94,13 +90,9 @@ public interface Subscription
 
     void releaseSendLock();
 
-    void releaseQueueEntry(final QueueEntry queueEntryImpl);
-
-    void onDequeue(final QueueEntry queueEntry);
-
     void restoreCredit(final QueueEntry queueEntry);
 
-    void setStateListener(final StateListener listener);
+    void setStateListener(final StateChangeListener<Subscription, State> listener);
 
     public State getState();
 
@@ -115,9 +107,9 @@ public interface Subscription
 
     public Object get(String key);
 
-    boolean isSessionTransactional();
-
     void queueEmpty() throws AMQException;
 
-    String getConsumerName();
+    String getName();
+
+    void flush() throws AMQException;
 }

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java Mon Feb  3 02:14:59 2014
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.util.StateChangeListener;
+
+public interface SubscriptionTarget
+{
+
+
+    enum State
+    {
+        ACTIVE, SUSPENDED, CLOSED
+    }
+
+    State getState();
+
+    void setStateListener(StateChangeListener<SubscriptionTarget, State> listener);
+
+    long getUnacknowledgedBytes();
+
+    long getUnacknowledgedMessages();
+
+    AMQSessionModel getSessionModel();
+
+    void send(QueueEntry entry, boolean batch) throws AMQException;
+
+    void flushBatched();
+
+    void queueDeleted();
+
+    void queueEmpty() throws AMQException;
+
+    boolean allocateCredit(QueueEntry msg);
+
+    void restoreCredit(QueueEntry queueEntry);
+
+    boolean isSuspended();
+
+    boolean close();
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java Mon Feb  3 02:14:59 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface Action<T>
+{
+    void performAction(T object);
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java?rev=1563758&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java Mon Feb  3 02:14:59 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+public interface StateChangeListener<T, E extends Enum>
+{
+    void stateChanged(T object, E oldState, E newState);
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Mon Feb  3 02:14:59 2014
@@ -141,7 +141,7 @@ public class VirtualHostConfigRecoveryHa
                             try
                             {
 
-                                queue.enqueue(message, true, null);
+                                queue.enqueue(message, null);
                                 ref.release();
                             }
                             catch (AMQException e)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Feb  3 02:14:59 2014
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -297,23 +298,15 @@ public class MockAMQQueue implements AMQ
     {
     }
 
-    public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+    public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
     {
     }
 
 
-    public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
-    {
-    }
-
     public void requeue(QueueEntry entry)
     {
     }
 
-    public void requeue(QueueEntryImpl storeContext, Subscription subscription)
-    {
-    }
-
     public void dequeue(QueueEntry entry, Subscription sub)
     {
     }
@@ -323,11 +316,11 @@ public class MockAMQQueue implements AMQ
         return false;
     }
 
-    public void addQueueDeleteTask(Task task)
+    public void addQueueDeleteTask(Action<AMQQueue> task)
     {
     }
 
-    public void removeQueueDeleteTask(final Task task)
+    public void removeQueueDeleteTask(final Action<AMQQueue> task)
     {
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Feb  3 02:14:59 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
 
 public class MockQueueEntry implements QueueEntry
 {
@@ -53,7 +55,7 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public void addStateChangeListener(StateChangeListener listener)
+    public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
     {
 
     }
@@ -63,7 +65,7 @@ public class MockQueueEntry implements Q
 
     }
 
-    public int routeToAlternate(final BaseQueue.PostEnqueueAction action, final ServerTransaction txn)
+    public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn)
     {
         return 0;
     }
@@ -137,7 +139,7 @@ public class MockQueueEntry implements Q
     }
 
 
-    public boolean removeStateChangeListener(StateChangeListener listener)
+    public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
     {
 
         return false;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Feb  3 02:14:59 2014
@@ -41,10 +41,10 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
 import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -228,9 +228,9 @@ public class SimpleAMQQueueTest extends 
         _queue.registerSubscription(_subscription, false);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
         {
-            public void onEnqueue(QueueEntry entry)
+            public void performAction(QueueEntry entry)
             {
                 queueEntries.add(entry);
             }
@@ -276,9 +276,9 @@ public class SimpleAMQQueueTest extends 
         _queue.registerSubscription(_subscription, false);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
         {
-            public void onEnqueue(QueueEntry entry)
+            public void performAction(QueueEntry entry)
             {
                 queueEntries.add(entry);
             }
@@ -323,9 +323,9 @@ public class SimpleAMQQueueTest extends 
         _queue.registerSubscription(_subscription, false);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
         {
-            public void onEnqueue(QueueEntry entry)
+            public void performAction(QueueEntry entry)
             {
                 queueEntries.add(entry);
             }
@@ -376,9 +376,9 @@ public class SimpleAMQQueueTest extends 
         _queue.registerSubscription(subscription2, false);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        PostEnqueueAction postEnqueueAction = new PostEnqueueAction()
+        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
         {
-            public void onEnqueue(QueueEntry entry)
+            public void performAction(QueueEntry entry)
             {
                 queueEntries.add(entry);
             }
@@ -1011,37 +1011,37 @@ public class SimpleAMQQueueTest extends 
         //verify behaviour in face of expected state changes:
 
         //verify a subscription going suspended->active increases the count
-        queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+        queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
 
         //verify a subscription going active->suspended decreases the count
-        queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+        queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify a subscription going suspended->closed doesn't change the count
-        queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
+        queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify a subscription going active->closed  decreases the count
-        queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
+        queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
         assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
 
         //verify behaviour in face of unexpected state changes:
 
         //verify a subscription going closed->active increases the count
-        queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+        queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify a subscription going active->active doesn't change the count
-        queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+        queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify a subscription going closed->suspended doesn't change the count
-        queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
+        queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify a subscription going suspended->suspended doesn't change the count
-        queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+        queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Mon Feb  3 02:14:59 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
 import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.StateChangeListener;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -47,7 +48,7 @@ public class MockSubscription implements
     private boolean _closed = false;
     private String tag = "mocktag";
     private AMQQueue queue = null;
-    private StateListener _listener = null;
+    private StateChangeListener<Subscription, State> _listener = null;
     private volatile AMQQueue.Context _queueContext = null;
     private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
@@ -75,16 +76,22 @@ public class MockSubscription implements
         _closed = true;
         if (_listener != null)
         {
-            _listener.stateChange(this, _state, State.CLOSED);
+            _listener.stateChanged(this, _state, State.CLOSED);
         }
         _state = State.CLOSED;
     }
 
-    public String getConsumerName()
+    public String getName()
     {
         return tag;
     }
 
+    @Override
+    public void flush() throws AMQException
+    {
+
+    }
+
     public long getSubscriptionID()
     {
         return _subscriptionID;
@@ -202,7 +209,7 @@ public class MockSubscription implements
         return false;
     }
 
-    public void queueDeleted(AMQQueue queue)
+    public void queueDeleted()
     {
     }
 
@@ -211,18 +218,10 @@ public class MockSubscription implements
         _stateChangeLock.unlock();
     }
 
-    public void onDequeue(QueueEntry queueEntry)
-    {
-    }
-
     public void restoreCredit(QueueEntry queueEntry)
     {
     }
 
-    public void releaseQueueEntry(QueueEntry queueEntry)
-    {
-    }
-
     public void send(QueueEntry entry, boolean batch) throws AMQException
     {
         if (messages.contains(entry))
@@ -251,7 +250,7 @@ public class MockSubscription implements
     {
     }
 
-    public void setStateListener(StateListener listener)
+    public void setStateListener(StateChangeListener<Subscription, State> listener)
     {
         this._listener = listener;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Mon Feb  3 02:14:59 2014
@@ -31,20 +31,19 @@ class ExplicitAcceptDispositionChangeLis
 
 
     private final QueueEntry _entry;
-    private final Subscription_0_10 _sub;
+    private final SubscriptionTarget_0_10 _target;
 
-    public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+    public ExplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
     {
         _entry = entry;
-        _sub = subscription_0_10;
+        _target = target;
     }
 
     public void onAccept()
     {
-        final Subscription_0_10 subscription = getSubscription();
-        if(subscription != null && _entry.isAcquiredBy(_sub))
+        if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
         {
-            subscription.getSessionModel().acknowledge(subscription, _entry);
+            _target.getSessionModel().acknowledge(_target, _entry);
         }
         else
         {
@@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        final Subscription_0_10 subscription = getSubscription();
-        if(subscription != null && _entry.isAcquiredBy(_sub))
+        if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
         {
-            subscription.release(_entry, setRedelivered);
+            _target.release(_entry, setRedelivered);
         }
         else
         {
@@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        final Subscription_0_10 subscription = getSubscription();
-        if(subscription != null && _entry.isAcquiredBy(_sub))
+        if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
         {
-            subscription.reject(_entry);
+            _target.reject(_entry);
         }
         else
         {
@@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        return _entry.acquire(getSubscription());
+        return _entry.acquire(_target.getSubscription());
     }
 
 
-    private Subscription_0_10 getSubscription()
-    {
-        return _sub;
-    }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Mon Feb  3 02:14:59 2014
@@ -30,12 +30,12 @@ class ImplicitAcceptDispositionChangeLis
 
 
     private final QueueEntry _entry;
-    private Subscription_0_10 _sub;
+    private SubscriptionTarget_0_10 _target;
 
-    public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+    public ImplicitAcceptDispositionChangeListener(QueueEntry entry, SubscriptionTarget_0_10 target)
     {
         _entry = entry;
-        _sub = subscription_0_10;
+        _target = target;
     }
 
     public void onAccept()
@@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_entry.isAcquiredBy(_sub))
+        if(_entry.isAcquiredBy(_target.getSubscription()))
         {
-            getSubscription().release(_entry, setRedelivered);
+            _target.release(_entry, setRedelivered);
         }
         else
         {
@@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        if(_entry.isAcquiredBy(_sub))
+        if(_entry.isAcquiredBy(_target.getSubscription()))
         {
-            getSubscription().reject(_entry);
+            _target.reject(_entry);
         }
         else
         {
@@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        boolean acquired = _entry.acquire(getSubscription());
+        boolean acquired = _entry.acquire(_target.getSubscription());
         if(acquired)
         {
-            getSubscription().recordUnacknowledged(_entry);
+            _target.recordUnacknowledged(_entry);
         }
         return acquired;
 
     }
 
-    public Subscription_0_10 getSubscription()
-    {
-        return _sub;
-    }
 
 
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1563758&r1=1563757&r2=1563758&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Mon Feb  3 02:14:59 2014
@@ -26,12 +26,12 @@ import org.apache.qpid.transport.Method;
 
 public class MessageAcceptCompletionListener implements Method.CompletionListener
 {
-    private final Subscription_0_10 _sub;
+    private final SubscriptionTarget_0_10 _sub;
     private final QueueEntry _entry;
     private final ServerSession _session;
     private boolean _restoreCredit;
 
-    public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+    public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
     {
         super();
         _sub = sub;
@@ -46,7 +46,7 @@ public class MessageAcceptCompletionList
         {
             _sub.restoreCredit(_entry);
         }
-        if(_entry.isAcquiredBy(_sub))
+        if(_entry.isAcquiredBy(_sub.getSubscription()))
         {
             _session.acknowledge(_sub, _entry);
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org