You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/04 00:28:13 UTC

svn commit: r1564130 [1/2] - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/subscription/ broker-core/src/test/java/org/apache/qpid/server...

Author: rgodfrey
Date: Mon Feb  3 23:28:12 2014
New Revision: 1564130

URL: http://svn.apache.org/r1564130
Log:
Change subscription registration for queues

Removed:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
Modified:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SubscriptionTarget_1_0.java

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Feb  3 23:28:12 2014
@@ -25,15 +25,19 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 
@@ -84,7 +88,9 @@ public interface AMQQueue extends Compar
 
     VirtualHost getVirtualHost();
 
-    void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
+    Subscription registerSubscription(final SubscriptionTarget target, final FilterManager filters,
+                                      final Class<? extends ServerMessage> messageClass,
+                                      final String consumerName, EnumSet<Subscription.Option> options) throws AMQException;
 
     void unregisterSubscription(final Subscription subscription) throws AMQException;
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Feb  3 23:28:12 2014
@@ -18,15 +18,7 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -43,6 +35,7 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -55,9 +48,11 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
 import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
 import org.apache.qpid.server.subscription.MessageGroupManager;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -390,7 +385,25 @@ public class SimpleAMQQueue implements A
 
     // ------ Manage Subscriptions
 
-    public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
+
+    @Override
+    public Subscription registerSubscription(final SubscriptionTarget target,
+                                             final FilterManager filters,
+                                             final Class<? extends ServerMessage> messageClass,
+                                             final String consumerName,
+                                             EnumSet<Subscription.Option> optionSet) throws AMQException
+    {
+
+        DelegatingSubscription sub = new DelegatingSubscription(filters, messageClass,
+                                                                optionSet.contains(Subscription.Option.ACQUIRES),
+                                                                optionSet.contains(Subscription.Option.SEES_REQUEUES),
+                                                                consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target);
+        target.subscriptionRegistered(sub);
+        registerSubscription(sub, optionSet.contains(Subscription.Option.EXCLUSIVE));
+        return sub;
+    }
+
+    private synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
             throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive
     {
         // Access control
@@ -479,6 +492,7 @@ public class SimpleAMQQueue implements A
             setExclusiveSubscriber(null);
             subscription.setQueueContext(null);
 
+
             if(_messageGroupManager != null)
             {
                 resetSubPointersForGroups(subscription, true);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AbstractSubscription.java Mon Feb  3 23:28:12 2014
@@ -178,7 +178,7 @@ public abstract class AbstractSubscripti
         else
         {
             // no interest in messages we can't convert
-            if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null)
+            if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), _messageClass)==null)
             {
                 return false;
             }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DelegatingSubscription.java Mon Feb  3 23:28:12 2014
@@ -143,6 +143,7 @@ public class DelegatingSubscription<T ex
     public void close()
     {
         _target.close();
+        _target.subscriptionRemoved(this);
     }
 
     @Override

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java Mon Feb  3 23:28:12 2014
@@ -32,6 +32,15 @@ public interface Subscription
 {
     AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
 
+    enum Option
+    {
+        ACQUIRES,
+        SEES_REQUEUES,
+        TRANSIENT,
+        EXCLUSIVE,
+        NO_LOCAL
+    }
+
     LogActor getLogActor();
 
     boolean isTransient();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/subscription/SubscriptionTarget.java Mon Feb  3 23:28:12 2014
@@ -36,6 +36,10 @@ public interface SubscriptionTarget
 
     State getState();
 
+    void subscriptionRegistered(Subscription sub);
+
+    void subscriptionRemoved(Subscription sub);
+
     void setStateListener(StateChangeListener<SubscriptionTarget, State> listener);
 
     long getUnacknowledgedBytes();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java Mon Feb  3 23:28:12 2014
@@ -20,11 +20,17 @@
  */
 package org.apache.qpid.server.logging.actors;
 
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
 import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.util.BrokerTestHelper;
 
 import java.util.List;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Test : AMQPConnectionActorTest
  * Validate the AMQPConnectionActor class.
@@ -42,9 +48,11 @@ public class SubscriptionActorTest exten
     {
         super.setUp();
 
-        MockSubscription mockSubscription = new MockSubscription();
+        Subscription mockSubscription = mock(Subscription.class);
+        final AMQQueue queue = BrokerTestHelper.createQueue(getName(), getVirtualHost());
+        when(mockSubscription.getQueue()).thenReturn(queue);
+        when(mockSubscription.getSubscriptionID()).thenReturn(0l);
 
-        mockSubscription.setQueue(BrokerTestHelper.createQueue(getName(), getVirtualHost()), false);
 
         setAmqpActor(new SubscriptionActor(getRootLogger(), mockSubscription));
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Mon Feb  3 23:28:12 2014
@@ -28,7 +28,10 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
+
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.subscription.Subscription;
 
 import static org.mockito.Mockito.when;
 
@@ -62,7 +65,7 @@ public class AMQPriorityQueueTest extend
         queue.enqueue(createMessage(9L, (byte) 0));
 
         // Register subscriber
-        queue.registerSubscription(getSubscription(), false);
+        queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class));
         Thread.sleep(150);
 
         ArrayList<QueueEntry> msgs = getSubscription().getMessages();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Feb  3 23:28:12 2014
@@ -24,16 +24,20 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -203,15 +207,23 @@ public class MockAMQQueue implements AMQ
         return _virtualhost;
     }
 
+    @Override
+    public Subscription registerSubscription(final SubscriptionTarget target,
+                                             final FilterManager filters,
+                                             final Class<? extends ServerMessage> messageClass,
+                                             final String consumerName,
+                                             final EnumSet<Subscription.Option> options) throws AMQException
+    {
+        return new DelegatingSubscription(filters, messageClass, options.contains(Subscription.Option.ACQUIRES),
+                                          options.contains(Subscription.Option.SEES_REQUEUES), consumerName,
+                                          options.contains(Subscription.Option.TRANSIENT), target );
+    }
+
     public String getName()
     {
         return _name;
     }
 
-    public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
-    {
-
-    }
 
     public void unregisterSubscription(Subscription subscription) throws AMQException
     {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Mon Feb  3 23:28:12 2014
@@ -113,11 +113,19 @@ public abstract class QueueEntryImplTest
      */
     private void acquire()
     {
-        _queueEntry.acquire(new MockSubscription());
+        _queueEntry.acquire(newMockSubscription());
         assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
                 _queueEntry.isAcquired());
     }
 
+    private Subscription newMockSubscription()
+    {
+        final Subscription subscription = mock(Subscription.class);
+        when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
+        when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+        return subscription;
+    }
+
     /**
      * A helper method to get entry state
      *
@@ -145,7 +153,7 @@ public abstract class QueueEntryImplTest
      */
     public void testRejectAndRejectedBy()
     {
-        Subscription sub = new MockSubscription();
+        Subscription sub = newMockSubscription();
         long subId = sub.getSubscriptionID();
 
         assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
@@ -160,7 +168,7 @@ public abstract class QueueEntryImplTest
         assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
 
         //repeat rejection using a second subscription
-        Subscription sub2 = new MockSubscription();
+        Subscription sub2 = newMockSubscription();
         long sub2Id = sub2.getSubscriptionID();
 
         assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Feb  3 23:28:12 2014
@@ -29,6 +29,8 @@ import static org.mockito.Matchers.conta
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
@@ -44,6 +46,7 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -65,7 +68,8 @@ public class SimpleAMQQueueTest extends 
     private String _owner = "owner";
     private String _routingKey = "routing key";
     private DirectExchange _exchange;
-    private MockSubscription _subscription = new MockSubscription();
+    private MockSubscription _subscriptionTarget = new MockSubscription();
+    private Subscription _subscription;
     private Map<String,Object> _arguments = null;
 
     @Override
@@ -159,17 +163,17 @@ public class SimpleAMQQueueTest extends 
 
     public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
     {
+        ServerMessage messageA = createMessage(new Long(24));
+
         // Check adding a subscription adds it to the queue
-        _queue.registerSubscription(_subscription, false);
-        assertEquals("Subscription did not get queue", _queue,
-                      _subscription.getQueue());
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.noneOf(Subscription.Option.class));
         assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
         assertEquals("Queue does not have active consumer", 1,
-                _queue.getActiveConsumerCount());
+                     _queue.getActiveConsumerCount());
 
         // Check sending a message ends up with the subscriber
-        ServerMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
         try
         {
@@ -179,14 +183,14 @@ public class SimpleAMQQueueTest extends 
         {
         }
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
-        assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+        assertNull(((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
 
         // Check removing the subscription removes it's information from the queue
         _queue.unregisterSubscription(_subscription);
-        assertTrue("Subscription still had queue", _subscription.isClosed());
+        assertTrue("Subscription still had queue", _subscriptionTarget.isClosed());
         assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
         assertFalse("Queue still has active consumer",
-                1 == _queue.getActiveConsumerCount());
+                    1 == _queue.getActiveConsumerCount());
 
         ServerMessage messageB = createMessage(new Long (25));
         _queue.enqueue(messageB);
@@ -198,10 +202,11 @@ public class SimpleAMQQueueTest extends 
     {
         ServerMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
-        _queue.registerSubscription(_subscription, false);
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.noneOf(Subscription.Option.class));
         Thread.sleep(150);
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
-        assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+        assertNull("There should be no releasedEntry after an enqueue", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
     }
 
     /**
@@ -213,10 +218,11 @@ public class SimpleAMQQueueTest extends 
         ServerMessage messageB = createMessage(new Long(25));
         _queue.enqueue(messageA);
         _queue.enqueue(messageB);
-        _queue.registerSubscription(_subscription, false);
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.noneOf(Subscription.Option.class));
         Thread.sleep(150);
         assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
-        assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+        assertNull("There should be no releasedEntry after enqueues", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
     }
 
     /**
@@ -225,7 +231,15 @@ public class SimpleAMQQueueTest extends 
      */
     public void testReleasedMessageIsResentToSubscriber() throws Exception
     {
-        _queue.registerSubscription(_subscription, false);
+
+        ServerMessage messageA = createMessage(new Long(24));
+        ServerMessage messageB = createMessage(new Long(25));
+        ServerMessage messageC = createMessage(new Long(26));
+
+
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.of(Subscription.Option.ACQUIRES,
+                                                               Subscription.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -236,10 +250,6 @@ public class SimpleAMQQueueTest extends 
             }
         };
 
-        ServerMessage messageA = createMessage(new Long(24));
-        ServerMessage messageB = createMessage(new Long(25));
-        ServerMessage messageC = createMessage(new Long(26));
-
         /* Enqueue three messages */
 
         _queue.enqueue(messageA, postEnqueueAction);
@@ -248,7 +258,7 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -259,11 +269,11 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 4, _subscription.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size());
         assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
         assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
     }
 
     /**
@@ -273,7 +283,11 @@ public class SimpleAMQQueueTest extends 
      */
     public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
     {
-        _queue.registerSubscription(_subscription, false);
+        ServerMessage messageA = createMessage(new Long(24));
+
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
+                                                               Subscription.Option.ACQUIRES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -286,7 +300,6 @@ public class SimpleAMQQueueTest extends 
 
         /* Enqueue one message with expiration set for a short time in the future */
 
-        ServerMessage messageA = createMessage(new Long(24));
         int messageExpirationOffset = 200;
         final long expiration = System.currentTimeMillis() + messageExpirationOffset;
         when(messageA.getExpiration()).thenReturn(expiration);
@@ -296,7 +309,7 @@ public class SimpleAMQQueueTest extends 
         int subFlushWaitTime = 150;
         Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 1, _subscription.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
 
         /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
@@ -306,9 +319,9 @@ public class SimpleAMQQueueTest extends 
         Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
 
         assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
-        assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
+        assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
 
     }
 
@@ -320,7 +333,14 @@ public class SimpleAMQQueueTest extends 
      */
     public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception
     {
-        _queue.registerSubscription(_subscription, false);
+
+        ServerMessage messageA = createMessage(new Long(24));
+        ServerMessage messageB = createMessage(new Long(25));
+        ServerMessage messageC = createMessage(new Long(26));
+
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.of(Subscription.Option.ACQUIRES,
+                                                               Subscription.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -331,10 +351,6 @@ public class SimpleAMQQueueTest extends 
             }
         };
 
-        ServerMessage messageA = createMessage(new Long(24));
-        ServerMessage messageB = createMessage(new Long(25));
-        ServerMessage messageC = createMessage(new Long(26));
-
         /* Enqueue three messages */
 
         _queue.enqueue(messageA, postEnqueueAction);
@@ -343,7 +359,7 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscription.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -355,11 +371,11 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 5, _subscription.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size());
         assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
         assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
-        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
+        assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext) _subscription.getQueueContext()).getReleasedEntry());
     }
 
 
@@ -369,11 +385,21 @@ public class SimpleAMQQueueTest extends 
      */
     public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
     {
-        MockSubscription subscription1 = new MockSubscription();
-        MockSubscription subscription2 = new MockSubscription();
+        ServerMessage messageA = createMessage(new Long(24));
+        ServerMessage messageB = createMessage(new Long(25));
+
+        MockSubscription target1 = new MockSubscription();
+        MockSubscription target2 = new MockSubscription();
+
+
+        Subscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
+                                                                 EnumSet.of(Subscription.Option.ACQUIRES,
+                                                                            Subscription.Option.SEES_REQUEUES));
+
+        Subscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
+                                                                 EnumSet.of(Subscription.Option.ACQUIRES,
+                                                                            Subscription.Option.SEES_REQUEUES));
 
-        _queue.registerSubscription(subscription1, false);
-        _queue.registerSubscription(subscription2, false);
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -384,8 +410,6 @@ public class SimpleAMQQueueTest extends 
             }
         };
 
-        ServerMessage messageA = createMessage(new Long(24));
-        ServerMessage messageB = createMessage(new Long(25));
 
         /* Enqueue two messages */
 
@@ -394,31 +418,36 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to both after enqueue", 2, subscription1.getMessages().size() + subscription2.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to both after enqueue",
+                     2,
+                     target1.getMessages().size() + target2.getMessages().size());
 
         /* Now release the first message only, causing it to be requeued */
         queueEntries.get(0).release();
 
         Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to both subscriptions after release",
+                     3,
+                     target1.getMessages().size() + target2.getMessages().size());
         assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry());
         assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry());
     }
 
     public void testExclusiveConsumer() throws AMQException
     {
+        ServerMessage messageA = createMessage(new Long(24));
         // Check adding an exclusive subscription adds it to the queue
-        _queue.registerSubscription(_subscription, true);
-        assertEquals("Subscription did not get queue", _queue,
-                _subscription.getQueue());
+
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.of(Subscription.Option.EXCLUSIVE));
+
         assertEquals("Queue does not have consumer", 1,
-                _queue.getConsumerCount());
+                     _queue.getConsumerCount());
         assertEquals("Queue does not have active consumer", 1,
-                _queue.getActiveConsumerCount());
+                     _queue.getActiveConsumerCount());
 
         // Check sending a message ends up with the subscriber
-        ServerMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
         try
         {
@@ -430,11 +459,14 @@ public class SimpleAMQQueueTest extends 
         assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
 
         // Check we cannot add a second subscriber to the queue
-        Subscription subB = new MockSubscription();
+        MockSubscription subB = new MockSubscription();
         Exception ex = null;
         try
         {
-            _queue.registerSubscription(subB, false);
+
+            _queue.registerSubscription(subB, null, messageA.getClass(), "test",
+                                        EnumSet.noneOf(Subscription.Option.class));
+
         }
         catch (AMQException e)
         {
@@ -445,10 +477,15 @@ public class SimpleAMQQueueTest extends 
         // Check we cannot add an exclusive subscriber to a queue with an
         // existing subscription
         _queue.unregisterSubscription(_subscription);
-        _queue.registerSubscription(_subscription, false);
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
+                                                    EnumSet.noneOf(Subscription.Option.class));
+
         try
         {
-            _queue.registerSubscription(subB, true);
+
+            _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test",
+                                                        EnumSet.of(Subscription.Option.EXCLUSIVE));
+
         }
         catch (AMQException e)
         {
@@ -462,8 +499,11 @@ public class SimpleAMQQueueTest extends 
        _queue.stop();
        _queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
        _queue.setDeleteOnNoConsumers(true);
-       _queue.registerSubscription(_subscription, false);
-       ServerMessage message = createMessage(new Long(25));
+
+        ServerMessage message = createMessage(new Long(25));
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
+                                                    EnumSet.noneOf(Subscription.Option.class));
+
        _queue.enqueue(message);
        _queue.unregisterSubscription(_subscription);
        assertTrue("Queue was not deleted when subscription was removed",
@@ -472,9 +512,12 @@ public class SimpleAMQQueueTest extends 
 
     public void testResend() throws Exception
     {
-        _queue.registerSubscription(_subscription, false);
         Long id = new Long(26);
         ServerMessage message = createMessage(id);
+
+        _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
+                                                    EnumSet.noneOf(Subscription.Option.class));
+
         _queue.enqueue(message);
         QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
         entry.setRedelivered();
@@ -649,18 +692,21 @@ public class SimpleAMQQueueTest extends 
         // in.Bias over 50% of the messages to the first subscription so that
         // the later subscriptions reject them and report being done before
         // the first subscription as the processQueue method proceeds.
-        List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
-        List<QueueEntry> msgListSub2 = createEntriesList(msg4);
-        List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+        List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+        List<String> msgListSub2 = createEntriesList(msg4);
+        List<String> msgListSub3 = createEntriesList(msg5);
 
         MockSubscription sub1 = new MockSubscription(msgListSub1);
         MockSubscription sub2 = new MockSubscription(msgListSub2);
         MockSubscription sub3 = new MockSubscription(msgListSub3);
 
         // register the subscriptions
-        testQueue.registerSubscription(sub1, false);
-        testQueue.registerSubscription(sub2, false);
-        testQueue.registerSubscription(sub3, false);
+        testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
+                                       EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+        testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
+                                       EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+        testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
+                                       EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
 
         //check that no messages have been delivered to the
         //subscriptions during registration
@@ -680,9 +726,9 @@ public class SimpleAMQQueueTest extends 
         });
 
         // check expected messages delivered to correct consumers
-        verifyReceivedMessages(msgListSub1, sub1.getMessages());
-        verifyReceivedMessages(msgListSub2, sub2.getMessages());
-        verifyReceivedMessages(msgListSub3, sub3.getMessages());
+        verifyReceivedMessages(Arrays.asList(msg1,msg2,msg3), sub1.getMessages());
+        verifyReceivedMessages(Collections.singletonList(msg4), sub2.getMessages());
+        verifyReceivedMessages(Collections.singletonList(msg5), sub3.getMessages());
     }
 
     /**
@@ -883,7 +929,7 @@ public class SimpleAMQQueueTest extends 
         try
         {
             // subscribe
-            testQueue.registerSubscription(subscription, false);
+            testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
 
             // process queue
             testQueue.processQueue(new QueueRunner(testQueue)
@@ -907,7 +953,7 @@ public class SimpleAMQQueueTest extends 
         {
             Thread.currentThread().interrupt();
         }
-        List<QueueEntry> expected = createEntriesList(entries.get(0), entries.get(2), entries.get(3));
+        List<QueueEntry> expected = Arrays.asList(entries.get(0), entries.get(2), entries.get(3));
         verifyReceivedMessages(expected, subscription.getMessages());
     }
 
@@ -970,7 +1016,7 @@ public class SimpleAMQQueueTest extends 
         // register subscription
         try
         {
-            queue.registerSubscription(subscription, false);
+            queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
         }
         catch (AMQException e)
         {
@@ -997,52 +1043,51 @@ public class SimpleAMQQueueTest extends 
         //verify adding an active subscription increases the count
         final MockSubscription subscription1 = new MockSubscription();
         subscription1.setActive(true);
+        subscription1.setState(SubscriptionTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-        queue.registerSubscription(subscription1, false);
+        queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify adding an inactive subscription doesn't increase the count
         final MockSubscription subscription2 = new MockSubscription();
         subscription2.setActive(false);
+        subscription2.setState(SubscriptionTarget.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-        queue.registerSubscription(subscription2, false);
+        queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify behaviour in face of expected state changes:
 
         //verify a subscription going suspended->active increases the count
-        queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+        subscription2.setState(SubscriptionTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
 
         //verify a subscription going active->suspended decreases the count
-        queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+        subscription2.setState(SubscriptionTarget.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify a subscription going suspended->closed doesn't change the count
-        queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
-        assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-
-        //verify a subscription going active->closed  decreases the count
-        queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
-        assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-
-        //verify behaviour in face of unexpected state changes:
-
-        //verify a subscription going closed->active increases the count
-        queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+        subscription2.setState(SubscriptionTarget.State.CLOSED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify a subscription going active->active doesn't change the count
-        queue.stateChanged(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+        subscription1.setState(SubscriptionTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
-        //verify a subscription going closed->suspended doesn't change the count
-        queue.stateChanged(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
-        assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+        subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+        assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
 
         //verify a subscription going suspended->suspended doesn't change the count
-        queue.stateChanged(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+        subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+        assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+        subscription1.setState(SubscriptionTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+        //verify a subscription going active->closed  decreases the count
+        subscription1.setState(SubscriptionTarget.State.CLOSED);
+        assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
     }
 
     public void testNotificationFiredOnEnqueue() throws Exception
@@ -1167,12 +1212,12 @@ public class SimpleAMQQueueTest extends 
         return entry;
     }
 
-    private List<QueueEntry> createEntriesList(QueueEntry... entries)
+    private List<String> createEntriesList(QueueEntry... entries)
     {
-        ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+        ArrayList<String> entriesList = new ArrayList<String>();
         for (QueueEntry entry : entries)
         {
-            entriesList.add(entry);
+            entriesList.add(entry.getMessage().getMessageHeader().getMessageId());
         }
         return entriesList;
     }
@@ -1197,7 +1242,7 @@ public class SimpleAMQQueueTest extends 
 
     public MockSubscription getSubscription()
     {
-        return _subscription;
+        return _subscriptionTarget;
     }
 
     public Map<String,Object> getArguments()
@@ -1213,14 +1258,15 @@ public class SimpleAMQQueueTest extends 
 
     protected ServerMessage createMessage(Long id) throws AMQException
     {
+        AMQMessageHeader header = mock(AMQMessageHeader.class);
+        when(header.getMessageId()).thenReturn(String.valueOf(id));
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn(id);
+        when(message.getMessageHeader()).thenReturn(header);
 
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
 
-        AMQMessageHeader hdr = mock(AMQMessageHeader.class);
-        when(message.getMessageHeader()).thenReturn(hdr);
 
         when(message.newReference()).thenReturn(ref);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Mon Feb  3 23:28:12 2014
@@ -23,6 +23,10 @@ package org.apache.qpid.server.subscript
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
+import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.model.Port;
@@ -42,36 +46,34 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class MockSubscription implements Subscription
+public class MockSubscription implements SubscriptionTarget
 {
 
+    private final List<String> _messageIds;
     private boolean _closed = false;
     private String tag = "mocktag";
     private AMQQueue queue = null;
-    private StateChangeListener<Subscription, State> _listener = null;
-    private volatile AMQQueue.Context _queueContext = null;
+    private StateChangeListener<SubscriptionTarget, State> _listener = null;
     private State _state = State.ACTIVE;
     private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
     private final Lock _stateChangeLock = new ReentrantLock();
-    private List<QueueEntry> _acceptEntries = null;
-
-    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
 
     private static final AtomicLong idGenerator = new AtomicLong(0);
     // Create a simple ID that increments for ever new Subscription
-    private final long _subscriptionID = idGenerator.getAndIncrement();
     private boolean _isActive = true;
+    private Subscription _subscription;
 
     public MockSubscription()
     {
+        _messageIds = null;
     }
 
-    public MockSubscription(List<QueueEntry> acceptEntries)
+    public MockSubscription(List<String> messageIds)
     {
-        _acceptEntries = acceptEntries;
+        _messageIds = messageIds;
     }
 
-    public void close()
+    public boolean close()
     {
         _closed = true;
         if (_listener != null)
@@ -79,6 +81,7 @@ public class MockSubscription implements
             _listener.stateChanged(this, _state, State.CLOSED);
         }
         _state = State.CLOSED;
+        return true;
     }
 
     public String getName()
@@ -86,45 +89,26 @@ public class MockSubscription implements
         return tag;
     }
 
-    @Override
-    public void flush() throws AMQException
-    {
-
-    }
-
-    public long getSubscriptionID()
-    {
-        return _subscriptionID;
-    }
-
-    public AMQQueue.Context getQueueContext()
-    {
-        return _queueContext;
-    }
-
-    public SubscriptionAcquiredState getOwningState()
-    {
-        return _owningState;
-    }
-
-    public LogActor getLogActor()
-    {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public boolean isTransient()
-    {
-        return false;
-    }
-
-    public long getBytesOut()
-    {
-        return 0;  // TODO - Implement
-    }
-
-    public long getMessagesOut()
+    public FilterManager getFilters()
     {
-        return 0;  // TODO - Implement
+        if(_messageIds != null)
+        {
+            SimpleFilterManager filters = new SimpleFilterManager();
+            filters.add(new MessageFilter()
+            {
+                @Override
+                public boolean matches(final Filterable message)
+                {
+                    final String messageId = message.getMessageHeader().getMessageId();
+                    return _messageIds.contains(messageId);
+                }
+            });
+            return filters;
+        }
+        else
+        {
+            return null;
+        }
     }
 
     public long getUnacknowledgedBytes()
@@ -147,62 +131,18 @@ public class MockSubscription implements
         return new MockSessionModel();
     }
 
-    public boolean trySendLock()
-    {
-        return _stateChangeLock.tryLock();
-    }
-
-
-    public void getSendLock()
-    {
-        _stateChangeLock.lock();
-    }
-
-    public boolean hasInterest(QueueEntry entry)
-    {
-        if(_acceptEntries != null)
-        {
-            //simulate selector behaviour, only signal
-            //interest in the dictated queue entries
-            return _acceptEntries.contains(entry);
-        }
-
-        return true;
-    }
-
     public boolean isActive()
     {
         return _isActive ;
     }
 
-    public void set(String key, Object value)
-    {
-    }
 
-    public Object get(String key)
-    {
-        return null;
-    }
-
-    public boolean isAutoClose()
-    {
-        return false;
-    }
 
     public boolean isClosed()
     {
         return _closed;
     }
 
-    public boolean acquires()
-    {
-        return true;
-    }
-
-    public boolean seesRequeues()
-    {
-        return true;
-    }
 
     public boolean isSuspended()
     {
@@ -213,11 +153,6 @@ public class MockSubscription implements
     {
     }
 
-    public void releaseSendLock()
-    {
-        _stateChangeLock.unlock();
-    }
-
     public void restoreCredit(QueueEntry queueEntry)
     {
     }
@@ -236,33 +171,37 @@ public class MockSubscription implements
 
     }
 
-    public void setQueueContext(AMQQueue.Context queueContext)
+    public State getState()
     {
-        _queueContext = queueContext;
+        return _state;
     }
 
-    public void setQueue(AMQQueue queue, boolean exclusive)
+    @Override
+    public void subscriptionRegistered(final Subscription sub)
     {
-        this.queue = queue;
+        _subscription = sub;
     }
 
-    public void setNoLocal(boolean noLocal)
+    @Override
+    public void subscriptionRemoved(final Subscription sub)
     {
-    }
 
-    public void setStateListener(StateChangeListener<Subscription, State> listener)
-    {
-        this._listener = listener;
     }
 
-    public State getState()
+    public void setState(State state)
     {
-        return _state;
+        State oldState = _state;
+        _state = state;
+        if(_listener != null)
+        {
+            _listener.stateChanged(this, oldState, state);
+        }
     }
 
-    public boolean wouldSuspend(QueueEntry msg)
+    @Override
+    public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener)
     {
-        return false;
+        _listener = listener;
     }
 
     public ArrayList<QueueEntry> getMessages()
@@ -270,13 +209,15 @@ public class MockSubscription implements
         return messages;
     }
 
-    public boolean isSessionTransactional()
+
+    public void queueEmpty() throws AMQException
     {
-        return false;
     }
 
-    public void queueEmpty() throws AMQException
+    @Override
+    public boolean allocateCredit(final QueueEntry msg)
     {
+        return true;
     }
 
     public void setActive(final boolean isActive)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/SubscriptionListTest.java Mon Feb  3 23:28:12 2014
@@ -20,25 +20,29 @@
  */
 package org.apache.qpid.server.subscription;
 
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNode;
 import org.apache.qpid.server.subscription.SubscriptionList.SubscriptionNodeIterator;
 import org.apache.qpid.test.utils.QpidTestCase;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class SubscriptionListTest extends QpidTestCase
 {
     private SubscriptionList _subList;
-    private MockSubscription _sub1;
-    private MockSubscription _sub2;
-    private MockSubscription _sub3;
+    private Subscription _sub1;
+    private Subscription _sub2;
+    private Subscription _sub3;
     private SubscriptionNode _node;
 
     protected void setUp()
     {
         _subList = new SubscriptionList();
 
-        _sub1 = new MockSubscription();
-        _sub2 = new MockSubscription();
-        _sub3 = new MockSubscription();
+        _sub1 = newMockSubscription();
+        _sub2 = newMockSubscription();
+        _sub3 = newMockSubscription();
 
         _subList.add(_sub1);
         _subList.add(_sub2);
@@ -47,6 +51,15 @@ public class SubscriptionListTest extend
         _node = _subList.getHead();
     }
 
+
+    private Subscription newMockSubscription()
+    {
+        final Subscription subscription = mock(Subscription.class);
+        when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
+        when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
+        return subscription;
+    }
+
     /**
      * Test that if the first (non-head) node in the list is deleted (but is still present),
      * it is not returned when searching through the list for the next viable node, and the
@@ -177,9 +190,9 @@ public class SubscriptionListTest extend
 
         assertEquals("Unexpected size result", 0, subList.size());
 
-        Subscription sub1 = new MockSubscription();
-        Subscription sub2 = new MockSubscription();
-        Subscription sub3 = new MockSubscription();
+        Subscription sub1 = newMockSubscription();
+        Subscription sub2 = newMockSubscription();
+        Subscription sub3 = newMockSubscription();
 
         subList.add(sub1);
         assertEquals("Unexpected size result", 1, subList.size());
@@ -253,7 +266,7 @@ public class SubscriptionListTest extend
      */
     public void testRemoveNonexistentNode()
     {
-        Subscription sub4 = new MockSubscription();
+        Subscription sub4 = newMockSubscription();
         assertNull("Should not have been a node present for the subscription", getNodeForSubscription(_subList, sub4));
         assertFalse("Removing subscription node should not have succeeded", _subList.remove(sub4));
         assertEquals("Unexpected number of nodes", 3, countNodes(_subList));
@@ -324,7 +337,7 @@ public class SubscriptionListTest extend
         assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
 
         //add a new 4th subscription to the list
-        Subscription sub4 = new MockSubscription();
+        Subscription sub4 = newMockSubscription();
         _subList.add(sub4);
 
         //get the node out the list for the 4th subscription

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Mon Feb  3 23:28:12 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.util.EnumSet;
 import java.util.LinkedHashMap;
 import java.util.UUID;
 import org.apache.log4j.Logger;
@@ -265,16 +266,28 @@ public class ServerSessionDelegate exten
                                                                                  filterManager,
                                                                                  method.getArguments());
 
-                    Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class,
-                                                                                           method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED,
-                                                                                           method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target);
-
-                    target.setSubscription(sub);
-
                     ((ServerSession)session).register(destination, target);
                     try
                     {
-                        queue.registerSubscription(sub, method.getExclusive());
+                        EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+                        if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
+                        {
+                            options.add(Subscription.Option.ACQUIRES);
+                        }
+                        if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                        {
+                            options.add(Subscription.Option.SEES_REQUEUES);
+                        }
+                        if(method.getExclusive())
+                        {
+                            options.add(Subscription.Option.EXCLUSIVE);
+                        }
+                        Subscription sub =
+                                queue.registerSubscription(target,
+                                                           filterManager,
+                                                           MessageTransferMessage.class,
+                                                           destination,
+                                                           options);
                     }
                     catch (AMQQueue.ExistingExclusiveSubscription existing)
                     {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java Mon Feb  3 23:28:12 2014
@@ -93,11 +93,6 @@ public class SubscriptionTarget_0_10 ext
     }
 
 
-    public void setSubscription(Subscription subscription)
-    {
-        _subscription = subscription;
-    }
-
     public Subscription getSubscription()
     {
         return _subscription;
@@ -570,6 +565,17 @@ public class SubscriptionTarget_0_10 ext
     }
 
 
+    @Override
+    public void subscriptionRegistered(final Subscription sub)
+    {
+        _subscription = sub;
+    }
+
+    @Override
+    public void subscriptionRemoved(final Subscription sub)
+    {
+    }
+
     public long getUnacknowledgedBytes()
     {
         return _unacknowledgedBytes.longValue();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Mon Feb  3 23:28:12 2014
@@ -21,19 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,7 @@ import org.apache.qpid.server.Transactio
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogActor;
@@ -81,6 +71,7 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionTarget;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -123,7 +114,7 @@ public class AMQChannel implements AMQSe
     private IncomingMessage _currentMessage;
 
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
-    private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+    private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>();
 
     private final MessageStore _messageStore;
 
@@ -498,9 +489,10 @@ public class AMQChannel implements AMQSe
     }
 
 
-    public Subscription getSubscription(AMQShortString subscription)
+    public Subscription getSubscription(AMQShortString tag)
     {
-        return _tag2SubscriptionMap.get(subscription);
+        final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+        return target == null ? null : target.getSubscription();
     }
 
     /**
@@ -526,34 +518,57 @@ public class AMQChannel implements AMQSe
             tag = new AMQShortString("sgen_" + getNextConsumerTag());
         }
 
-        if (_tag2SubscriptionMap.containsKey(tag))
+        if (_tag2SubscriptionTargetMap.containsKey(tag))
         {
             throw new AMQException("Consumer already exists with same tag: " + tag);
         }
 
-         Subscription subscription =
-                SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+        SubscriptionTarget_0_8 target;
+        EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
 
+        if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+        {
+            target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+            options.add(Subscription.Option.TRANSIENT);
+        }
+        else if(acks)
+        {
+            target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+            options.add(Subscription.Option.ACQUIRES);
+            options.add(Subscription.Option.SEES_REQUEUES);
+        }
+        else
+        {
+            target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+            options.add(Subscription.Option.ACQUIRES);
+            options.add(Subscription.Option.SEES_REQUEUES);
+        }
+
+        if(exclusive)
+        {
+            options.add(Subscription.Option.EXCLUSIVE);
+        }
 
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
         // We add before we register as the Async Delivery process may AutoClose the subscriber
         // so calling _cT2QM.remove before we have done put which was after the register succeeded.
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
 
-        _tag2SubscriptionMap.put(tag, subscription);
+        _tag2SubscriptionTargetMap.put(tag, target);
 
         try
         {
-            queue.registerSubscription(subscription, exclusive);
+            Subscription sub =
+                    queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options);
         }
         catch (AMQException e)
         {
-            _tag2SubscriptionMap.remove(tag);
+            _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
         catch (RuntimeException e)
         {
-            _tag2SubscriptionMap.remove(tag);
+            _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
         return tag;
@@ -568,7 +583,8 @@ public class AMQChannel implements AMQSe
     public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
     {
 
-        Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+        SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+        Subscription sub = target == null ? null : target.getSubscription();
         if (sub != null)
         {
             try
@@ -634,7 +650,7 @@ public class AMQChannel implements AMQSe
     {
         if (_logger.isInfoEnabled())
         {
-            if (!_tag2SubscriptionMap.isEmpty())
+            if (!_tag2SubscriptionTargetMap.isEmpty())
             {
                 _logger.info("Unsubscribing all consumers on channel " + toString());
             }
@@ -644,14 +660,14 @@ public class AMQChannel implements AMQSe
             }
         }
 
-        for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+        for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
         {
             if (_logger.isInfoEnabled())
             {
                 _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
-            Subscription sub = me.getValue();
+            Subscription sub = me.getValue().getSubscription();
 
             try
             {
@@ -665,7 +681,7 @@ public class AMQChannel implements AMQSe
 
         }
 
-        _tag2SubscriptionMap.clear();
+        _tag2SubscriptionTargetMap.clear();
     }
 
     /**
@@ -977,9 +993,9 @@ public class AMQChannel implements AMQSe
             if (wasSuspended)
             {
                 // may need to deliver queued messages
-                for (Subscription s : _tag2SubscriptionMap.values())
+                for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
-                    s.getQueue().deliverAsync(s);
+                    s.getSubscription().getQueue().deliverAsync(s.getSubscription());
                 }
             }
 
@@ -993,15 +1009,15 @@ public class AMQChannel implements AMQSe
             if (!wasSuspended)
             {
                 // may need to deliver queued messages
-                for (Subscription s : _tag2SubscriptionMap.values())
+                for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
                     try
                     {
-                        s.getSendLock();
+                        s.getSubscription().getSendLock();
                     }
                     finally
                     {
-                        s.releaseSendLock();
+                        s.getSubscription().releaseSendLock();
                     }
                 }
             }
@@ -1078,10 +1094,10 @@ public class AMQChannel implements AMQSe
         boolean requiresSuspend = _suspended.compareAndSet(false,true);
 
         // ensure all subscriptions have seen the change to the channel state
-        for(Subscription sub : _tag2SubscriptionMap.values())
+        for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
         {
-            sub.getSendLock();
-            sub.releaseSendLock();
+            sub.getSubscription().getSendLock();
+            sub.getSubscription().releaseSendLock();
         }
 
         try
@@ -1116,9 +1132,9 @@ public class AMQChannel implements AMQSe
         if(requiresSuspend)
         {
             _suspended.set(false);
-            for(Subscription sub : _tag2SubscriptionMap.values())
+            for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
             {
-                sub.getQueue().deliverAsync(sub);
+                sub.getSubscription().getQueue().deliverAsync(sub.getSubscription());
             }
 
         }
@@ -1672,6 +1688,6 @@ public class AMQChannel implements AMQSe
     @Override
     public int getConsumerCount()
     {
-        return _tag2SubscriptionMap.size();
+        return _tag2SubscriptionTargetMap.size();
     }
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java?rev=1564130&r1=1564129&r2=1564130&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionTarget_0_8.java Mon Feb  3 23:28:12 2014
@@ -76,6 +76,13 @@ public abstract class SubscriptionTarget
     private Subscription _subscription;
 
 
+    public static SubscriptionTarget_0_8 createBrowserTarget(AMQChannel channel,
+                                                             AMQShortString consumerTag, FieldTable filters,
+                                                             FlowCreditManager creditManager) throws AMQException
+    {
+        return new BrowserSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+    }
+
     static final class BrowserSubscription extends SubscriptionTarget_0_8
     {
         public BrowserSubscription(AMQChannel channel,
@@ -120,6 +127,22 @@ public abstract class SubscriptionTarget
 
     }
 
+    public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+                                                           AMQShortString consumerTag, FieldTable filters,
+                                                           FlowCreditManager creditManager) throws AMQException
+    {
+        return new NoAckSubscription(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+    }
+
+    public static SubscriptionTarget_0_8 createNoAckTarget(AMQChannel channel,
+                                                           AMQShortString consumerTag, FieldTable filters,
+                                                           FlowCreditManager creditManager,
+                                                           ClientDeliveryMethod deliveryMethod,
+                                                           RecordDeliveryMethod recordMethod) throws AMQException
+    {
+        return new NoAckSubscription(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
+    }
+
     public static class NoAckSubscription extends SubscriptionTarget_0_8
     {
         private final AutoCommitTransaction _txn;
@@ -220,6 +243,26 @@ public abstract class SubscriptionTarget
 
     }
 
+
+    public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+                                                         AMQShortString consumerTag, FieldTable filters,
+                                                         FlowCreditManager creditManager)
+            throws AMQException
+    {
+        return new AckSubscription(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
+    }
+
+
+    public static SubscriptionTarget_0_8 createAckTarget(AMQChannel channel,
+                                                         AMQShortString consumerTag, FieldTable filters,
+                                                         FlowCreditManager creditManager,
+                                                         ClientDeliveryMethod deliveryMethod,
+                                                         RecordDeliveryMethod recordMethod)
+            throws AMQException
+    {
+        return new AckSubscription(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
+    }
+
     static final class AckSubscription extends SubscriptionTarget_0_8
     {
         public AckSubscription(AMQChannel channel,
@@ -317,14 +360,20 @@ public abstract class SubscriptionTarget
         }
     }
 
-    public void setSubscription(Subscription subscription)
+    public Subscription getSubscription()
     {
-        _subscription = subscription;
+        return _subscription;
     }
 
-    public Subscription getSubscription()
+    @Override
+    public void subscriptionRemoved(final Subscription sub)
     {
-        return _subscription;
+    }
+
+    @Override
+    public void subscriptionRegistered(final Subscription sub)
+    {
+        _subscription = sub;
     }
 
     public AMQSessionModel getSessionModel()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org