You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 11:29:57 UTC

svn commit: r1564703 [3/4] - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/logging/actors/ broker-core/src/main/java/org/apache/qpid/s...

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Feb  5 10:29:55 2014
@@ -29,8 +29,8 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -207,15 +207,15 @@ public class MockAMQQueue implements AMQ
     }
 
     @Override
-    public Subscription registerSubscription(final SubscriptionTarget target,
-                                             final FilterManager filters,
-                                             final Class<? extends ServerMessage> messageClass,
-                                             final String consumerName,
-                                             final EnumSet<Subscription.Option> options) throws AMQException
-    {
-        return new QueueSubscription(filters, messageClass, options.contains(Subscription.Option.ACQUIRES),
-                                          options.contains(Subscription.Option.SEES_REQUEUES), consumerName,
-                                          options.contains(Subscription.Option.TRANSIENT), target );
+    public Consumer addConsumer(final ConsumerTarget target,
+                                final FilterManager filters,
+                                final Class<? extends ServerMessage> messageClass,
+                                final String consumerName,
+                                final EnumSet<Consumer.Option> options) throws AMQException
+    {
+        return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES),
+                                          options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
+                                          options.contains(Consumer.Option.TRANSIENT), target );
     }
 
     public String getName()
@@ -224,22 +224,18 @@ public class MockAMQQueue implements AMQ
     }
 
 
-    public void unregisterSubscription(Subscription subscription) throws AMQException
-    {
-
-    }
 
-    public Collection<Subscription> getConsumers()
+    public Collection<Consumer> getConsumers()
     {
         return Collections.emptyList();
     }
 
-    public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
     {
 
     }
 
-    public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
     {
 
     }
@@ -254,7 +250,7 @@ public class MockAMQQueue implements AMQ
         return 0;
     }
 
-    public boolean hasExclusiveSubscriber()
+    public boolean hasExclusiveConsumer()
     {
         return false;
     }
@@ -318,11 +314,11 @@ public class MockAMQQueue implements AMQ
     {
     }
 
-    public void dequeue(QueueEntry entry, Subscription sub)
+    public void dequeue(QueueEntry entry, Consumer sub)
     {
     }
 
-    public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
+    public boolean resend(QueueEntry entry, Consumer consumer) throws AMQException
     {
         return false;
     }
@@ -431,12 +427,12 @@ public class MockAMQQueue implements AMQ
         return null;
     }
 
-    public void flushSubscription(Subscription sub) throws AMQException
+    public void flushConsumer(Consumer sub) throws AMQException
     {
 
     }
 
-    public void deliverAsync(Subscription sub)
+    public void deliverAsync(Consumer sub)
     {
 
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Wed Feb  5 10:29:55 2014
@@ -26,7 +26,7 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -41,7 +41,7 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public boolean acquire(Subscription sub)
+    public boolean acquire(Consumer sub)
     {
         return false;
     }
@@ -52,12 +52,12 @@ public class MockQueueEntry implements Q
         return 0;
     }
 
-    public boolean acquiredBySubscription()
+    public boolean acquiredByConsumer()
     {
         return false;
     }
 
-    public boolean isAcquiredBy(Subscription subscription)
+    public boolean isAcquiredBy(Consumer consumer)
     {
         return false;
     }
@@ -87,7 +87,7 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public Subscription getDeliveredSubscription()
+    public Consumer getDeliveredConsumer()
     {
         return null;
     }
@@ -125,7 +125,7 @@ public class MockQueueEntry implements Q
     }
 
 
-    public boolean isRejectedBy(Subscription subscription)
+    public boolean isRejectedBy(Consumer consumer)
     {
 
         return false;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Wed Feb  5 10:29:55 2014
@@ -21,10 +21,11 @@ package org.apache.qpid.server.queue;
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 
 import java.lang.reflect.Field;
 
@@ -112,17 +113,17 @@ public abstract class QueueEntryImplTest
      */
     private void acquire()
     {
-        _queueEntry.acquire(newMockSubscription());
+        _queueEntry.acquire(newMockConsumer());
         assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
                 _queueEntry.isAcquired());
     }
 
-    private Subscription newMockSubscription()
+    private Consumer newMockConsumer()
     {
-        final Subscription subscription = mock(Subscription.class);
-        when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
-        when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
-        return subscription;
+        final Consumer consumer = mock(Consumer.class);
+        when(consumer.getOwningState()).thenReturn(new MessageInstance.ConsumerAcquiredState(consumer));
+        when(consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+        return consumer;
     }
 
     /**
@@ -147,34 +148,34 @@ public abstract class QueueEntryImplTest
     }
 
     /**
-     * Tests rejecting a queue entry records the Subscription ID
-     * for later verification by isRejectedBy(subscriptionId).
+     * Tests rejecting a queue entry records the Consumer ID
+     * for later verification by isRejectedBy(consumerId).
      */
     public void testRejectAndRejectedBy()
     {
-        Subscription sub = newMockSubscription();
+        Consumer sub = newMockConsumer();
 
-        assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
-        assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
+        assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+        assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
 
-        //acquire, reject, and release the message using the subscription
+        //acquire, reject, and release the message using the consumer
         assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
         _queueEntry.reject();
         _queueEntry.release();
 
         //verify the rejection is recorded
-        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
+        assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
 
-        //repeat rejection using a second subscription
-        Subscription sub2 = newMockSubscription();
+        //repeat rejection using a second consumer
+        Consumer sub2 = newMockConsumer();
 
-        assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
+        assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
         assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
         _queueEntry.reject();
 
-        //verify it still records being rejected by both subscriptions
-        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
-        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
+        //verify it still records being rejected by both consumers
+        assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+        assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
     }
 
     /**

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Wed Feb  5 10:29:55 2014
@@ -45,9 +45,9 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -69,8 +69,8 @@ public class SimpleAMQQueueTest extends 
     private String _owner = "owner";
     private String _routingKey = "routing key";
     private DirectExchange _exchange;
-    private MockSubscription _subscriptionTarget = new MockSubscription();
-    private QueueSubscription _subscription;
+    private MockConsumer _consumerTarget = new MockConsumer();
+    private QueueConsumer _consumer;
     private Map<String,Object> _arguments = null;
 
     @Override
@@ -162,13 +162,13 @@ public class SimpleAMQQueueTest extends 
 
     }
 
-    public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
+    public void testRegisterConsumerThenEnqueueMessage() throws AMQException
     {
         ServerMessage messageA = createMessage(new Long(24));
 
-        // Check adding a subscription adds it to the queue
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.noneOf(Subscription.Option.class));
+        // Check adding a consumer adds it to the queue
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.noneOf(Consumer.Option.class));
         assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
         assertEquals("Queue does not have active consumer", 1,
@@ -183,49 +183,49 @@ public class SimpleAMQQueueTest extends 
         catch(InterruptedException e)
         {
         }
-        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
-        assertNull(_subscription.getQueueContext().getReleasedEntry());
+        assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
+        assertNull(_consumer.getQueueContext().getReleasedEntry());
 
-        // Check removing the subscription removes it's information from the queue
-        _subscription.close();
-        assertTrue("Subscription still had queue", _subscriptionTarget.isClosed());
+        // Check removing the consumer removes it's information from the queue
+        _consumer.close();
+        assertTrue("Consumer still had queue", _consumerTarget.isClosed());
         assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
         assertFalse("Queue still has active consumer",
                     1 == _queue.getActiveConsumerCount());
 
         ServerMessage messageB = createMessage(new Long (25));
         _queue.enqueue(messageB);
-         assertNull(_subscription.getQueueContext());
+         assertNull(_consumer.getQueueContext());
 
     }
 
-    public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException
+    public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
     {
         ServerMessage messageA = createMessage(new Long(24));
         _queue.enqueue(messageA);
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.noneOf(Subscription.Option.class));
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.noneOf(Consumer.Option.class));
         Thread.sleep(150);
-        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+        assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after an enqueue",
-                   _subscription.getQueueContext().getReleasedEntry());
+                   _consumer.getQueueContext().getReleasedEntry());
     }
 
     /**
      * Tests enqueuing two messages.
      */
-    public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception
+    public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception
     {
         ServerMessage messageA = createMessage(new Long(24));
         ServerMessage messageB = createMessage(new Long(25));
         _queue.enqueue(messageA);
         _queue.enqueue(messageB);
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.noneOf(Subscription.Option.class));
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.noneOf(Consumer.Option.class));
         Thread.sleep(150);
-        assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+        assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
         assertNull("There should be no releasedEntry after enqueues",
-                   _subscription.getQueueContext().getReleasedEntry());
+                   _consumer.getQueueContext().getReleasedEntry());
     }
 
     /**
@@ -240,9 +240,9 @@ public class SimpleAMQQueueTest extends 
         ServerMessage messageC = createMessage(new Long(26));
 
 
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.of(Subscription.Option.ACQUIRES,
-                                                               Subscription.Option.SEES_REQUEUES));
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.of(Consumer.Option.ACQUIRES,
+                                                      Consumer.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -261,7 +261,9 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to consumer",
+                     3,
+                     _consumerTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -272,12 +274,14 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to consumer",
+                     4,
+                     _consumerTarget.getMessages().size());
         assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
         assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
         assertNull("releasedEntry should be cleared after requeue processed",
-                   _subscription.getQueueContext().getReleasedEntry());
+                   _consumer.getQueueContext().getReleasedEntry());
     }
 
     /**
@@ -289,9 +293,9 @@ public class SimpleAMQQueueTest extends 
     {
         ServerMessage messageA = createMessage(new Long(24));
 
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.of(Subscription.Option.SEES_REQUEUES,
-                                                               Subscription.Option.ACQUIRES));
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.of(Consumer.Option.SEES_REQUEUES,
+                                                      Consumer.Option.ACQUIRES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -313,7 +317,9 @@ public class SimpleAMQQueueTest extends 
         int subFlushWaitTime = 150;
         Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to consumer",
+                     1,
+                     _consumerTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
 
         /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
@@ -323,10 +329,12 @@ public class SimpleAMQQueueTest extends 
         Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
 
         assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
-        assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
+        assertEquals("Total number of messages sent should not have changed",
+                     1,
+                     _consumerTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
         assertNull("releasedEntry should be cleared after requeue processed",
-                   _subscription.getQueueContext().getReleasedEntry());
+                   _consumer.getQueueContext().getReleasedEntry());
 
     }
 
@@ -343,9 +351,9 @@ public class SimpleAMQQueueTest extends 
         ServerMessage messageB = createMessage(new Long(25));
         ServerMessage messageC = createMessage(new Long(26));
 
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.of(Subscription.Option.ACQUIRES,
-                                                               Subscription.Option.SEES_REQUEUES));
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.of(Consumer.Option.ACQUIRES,
+                                                      Consumer.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
         Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -364,7 +372,9 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150);  // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to consumer",
+                     3,
+                     _consumerTarget.getMessages().size());
         assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
         assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -376,35 +386,37 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size());
+        assertEquals("Unexpected total number of messages sent to consumer",
+                     5,
+                     _consumerTarget.getMessages().size());
         assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
         assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
         assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
         assertNull("releasedEntry should be cleared after requeue processed",
-                   _subscription.getQueueContext().getReleasedEntry());
+                   _consumer.getQueueContext().getReleasedEntry());
     }
 
 
     /**
-     * Tests that a release requeues an entry for a queue with multiple subscriptions.  Verifies that a
+     * Tests that a release requeues an entry for a queue with multiple consumers.  Verifies that a
      * requeue resends a message to a <i>single</i> subscriber.
      */
-    public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
+    public void testReleaseForQueueWithMultipleConsumers() throws Exception
     {
         ServerMessage messageA = createMessage(new Long(24));
         ServerMessage messageB = createMessage(new Long(25));
 
-        MockSubscription target1 = new MockSubscription();
-        MockSubscription target2 = new MockSubscription();
+        MockConsumer target1 = new MockConsumer();
+        MockConsumer target2 = new MockConsumer();
 
 
-        QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
-                                                                 EnumSet.of(Subscription.Option.ACQUIRES,
-                                                                            Subscription.Option.SEES_REQUEUES));
+        QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test",
+                                                         EnumSet.of(Consumer.Option.ACQUIRES,
+                                                                    Consumer.Option.SEES_REQUEUES));
 
-        QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
-                                                                 EnumSet.of(Subscription.Option.ACQUIRES,
-                                                                            Subscription.Option.SEES_REQUEUES));
+        QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test",
+                                                         EnumSet.of(Consumer.Option.ACQUIRES,
+                                                                    Consumer.Option.SEES_REQUEUES));
 
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
@@ -433,22 +445,22 @@ public class SimpleAMQQueueTest extends 
 
         Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
 
-        assertEquals("Unexpected total number of messages sent to both subscriptions after release",
+        assertEquals("Unexpected total number of messages sent to both consumers after release",
                      3,
                      target1.getMessages().size() + target2.getMessages().size());
         assertNull("releasedEntry should be cleared after requeue processed",
-                   subscription1.getQueueContext().getReleasedEntry());
+                   consumer1.getQueueContext().getReleasedEntry());
         assertNull("releasedEntry should be cleared after requeue processed",
-                   subscription2.getQueueContext().getReleasedEntry());
+                   consumer2.getQueueContext().getReleasedEntry());
     }
 
     public void testExclusiveConsumer() throws AMQException
     {
         ServerMessage messageA = createMessage(new Long(24));
-        // Check adding an exclusive subscription adds it to the queue
+        // Check adding an exclusive consumer adds it to the queue
 
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.of(Subscription.Option.EXCLUSIVE));
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.of(Consumer.Option.EXCLUSIVE));
 
         assertEquals("Queue does not have consumer", 1,
                      _queue.getConsumerCount());
@@ -464,16 +476,16 @@ public class SimpleAMQQueueTest extends 
         catch (InterruptedException e)
         {
         }
-        assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+        assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
 
         // Check we cannot add a second subscriber to the queue
-        MockSubscription subB = new MockSubscription();
+        MockConsumer subB = new MockConsumer();
         Exception ex = null;
         try
         {
 
-            _queue.registerSubscription(subB, null, messageA.getClass(), "test",
-                                        EnumSet.noneOf(Subscription.Option.class));
+            _queue.addConsumer(subB, null, messageA.getClass(), "test",
+                               EnumSet.noneOf(Consumer.Option.class));
 
         }
         catch (AMQException e)
@@ -483,16 +495,16 @@ public class SimpleAMQQueueTest extends 
         assertNotNull(ex);
 
         // Check we cannot add an exclusive subscriber to a queue with an
-        // existing subscription
-        _subscription.close();
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
-                                                    EnumSet.noneOf(Subscription.Option.class));
+        // existing consumer
+        _consumer.close();
+        _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                           EnumSet.noneOf(Consumer.Option.class));
 
         try
         {
 
-            _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test",
-                                                        EnumSet.of(Subscription.Option.EXCLUSIVE));
+            _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test",
+                                               EnumSet.of(Consumer.Option.EXCLUSIVE));
 
         }
         catch (AMQException e)
@@ -509,12 +521,12 @@ public class SimpleAMQQueueTest extends 
        _queue.setDeleteOnNoConsumers(true);
 
         ServerMessage message = createMessage(new Long(25));
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
-                                                    EnumSet.noneOf(Subscription.Option.class));
+        _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+                                           EnumSet.noneOf(Consumer.Option.class));
 
        _queue.enqueue(message);
-       _subscription.close();
-       assertTrue("Queue was not deleted when subscription was removed",
+       _consumer.close();
+       assertTrue("Queue was not deleted when consumer was removed",
                   _queue.isDeleted());
     }
 
@@ -523,13 +535,13 @@ public class SimpleAMQQueueTest extends 
         Long id = new Long(26);
         ServerMessage message = createMessage(id);
 
-        _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
-                                                    EnumSet.noneOf(Subscription.Option.class));
+        _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+                                           EnumSet.noneOf(Consumer.Option.class));
 
         _queue.enqueue(message);
-        QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
+        QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry();
         entry.setRedelivered();
-        _subscription.resend(entry);
+        _consumer.resend(entry);
 
     }
 
@@ -656,19 +668,19 @@ public class SimpleAMQQueueTest extends 
 
     /**
      * processQueue() is used when asynchronously delivering messages to
-     * subscriptions which could not be delivered immediately during the
+     * consumers which could not be delivered immediately during the
      * enqueue() operation.
      *
      * A defect within the method would mean that delivery of these messages may
      * not occur should the Runner stop before all messages have been processed.
      * Such a defect was discovered when Selectors were used such that one and
-     * only one subscription can/will accept any given messages, but multiple
-     * subscriptions are present, and one of the earlier subscriptions receives
+     * only one consumer can/will accept any given messages, but multiple
+     * consumers are present, and one of the earlier consumers receives
      * more messages than the others.
      *
      * This test is to validate that the processQueue() method is able to
      * correctly deliver all of the messages present for asynchronous delivery
-     * to subscriptions in such a scenario.
+     * to consumers in such a scenario.
      */
     public void testProcessQueueWithUniqueSelectors() throws Exception
     {
@@ -677,10 +689,10 @@ public class SimpleAMQQueueTest extends 
                                                       false, false, _virtualHost, factory, null)
         {
             @Override
-            public void deliverAsync(QueueSubscription sub)
+            public void deliverAsync(QueueConsumer sub)
             {
                 // do nothing, i.e prevent deliveries by the SubFlushRunner
-                // when registering the new subscriptions
+                // when registering the new consumers
             }
         };
 
@@ -696,28 +708,28 @@ public class SimpleAMQQueueTest extends 
         QueueEntry msg4 = list.add(createMessage(4L));
         QueueEntry msg5 = list.add(createMessage(5L));
 
-        // Create lists of the entries each subscription should be interested
-        // in.Bias over 50% of the messages to the first subscription so that
-        // the later subscriptions reject them and report being done before
-        // the first subscription as the processQueue method proceeds.
+        // Create lists of the entries each consumer should be interested
+        // in.Bias over 50% of the messages to the first consumer so that
+        // the later consumers reject them and report being done before
+        // the first consumer as the processQueue method proceeds.
         List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
         List<String> msgListSub2 = createEntriesList(msg4);
         List<String> msgListSub3 = createEntriesList(msg5);
 
-        MockSubscription sub1 = new MockSubscription(msgListSub1);
-        MockSubscription sub2 = new MockSubscription(msgListSub2);
-        MockSubscription sub3 = new MockSubscription(msgListSub3);
-
-        // register the subscriptions
-        testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
-                                       EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
-        testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
-                                       EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
-        testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
-                                       EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+        MockConsumer sub1 = new MockConsumer(msgListSub1);
+        MockConsumer sub2 = new MockConsumer(msgListSub2);
+        MockConsumer sub3 = new MockConsumer(msgListSub3);
+
+        // register the consumers
+        testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
+                              EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+        testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
+                              EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+        testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
+                              EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
 
         //check that no messages have been delivered to the
-        //subscriptions during registration
+        //consumers during registration
         assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
         assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
         assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
@@ -904,7 +916,7 @@ public class SimpleAMQQueueTest extends 
                 false, "testOwner", false, false, _virtualHost, null)
         {
             @Override
-            public void deliverAsync(QueueSubscription sub)
+            public void deliverAsync(QueueConsumer sub)
             {
                 // do nothing
             }
@@ -919,8 +931,8 @@ public class SimpleAMQQueueTest extends 
         // latch to wait for message receipt
         final CountDownLatch latch = new CountDownLatch(messageNumber -1);
 
-        // create a subscription
-        MockSubscription subscription = new MockSubscription()
+        // create a consumer
+        MockConsumer consumer = new MockConsumer()
         {
             /**
              * Send a message and decrement latch
@@ -937,7 +949,11 @@ public class SimpleAMQQueueTest extends 
         try
         {
             // subscribe
-            testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+            testQueue.addConsumer(consumer,
+                                  null,
+                                  entries.get(0).getMessage().getClass(),
+                                  "test",
+                                  EnumSet.noneOf(Consumer.Option.class));
 
             // process queue
             testQueue.processQueue(new QueueRunner(testQueue)
@@ -962,11 +978,11 @@ public class SimpleAMQQueueTest extends 
             Thread.currentThread().interrupt();
         }
         List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
-        verifyReceivedMessages(expected, subscription.getMessages());
+        verifyReceivedMessages(expected, consumer.getMessages());
     }
 
     /**
-     * Tests that entry in dequeued state are not enqueued and not delivered to subscription
+     * Tests that entry in dequeued state are not enqueued and not delivered to consumer
      */
     public void testEnqueueDequeuedEntry()
     {
@@ -1002,7 +1018,7 @@ public class SimpleAMQQueueTest extends 
                                     }
 
                                     @Override
-                                    public boolean acquire(Subscription sub)
+                                    public boolean acquire(Consumer sub)
                                     {
                                         if(message.getMessageNumber() % 2 == 0)
                                         {
@@ -1018,24 +1034,28 @@ public class SimpleAMQQueueTest extends 
                         };
                     }
                 }, null);
-        // create a subscription
-        MockSubscription subscription = new MockSubscription();
+        // create a consumer
+        MockConsumer consumer = new MockConsumer();
 
-        // register subscription
+        // register consumer
         try
         {
-            queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+            queue.addConsumer(consumer,
+                              null,
+                              createMessage(-1l).getClass(),
+                              "test",
+                              EnumSet.noneOf(Consumer.Option.class));
         }
         catch (AMQException e)
         {
-            fail("Failure to register subscription:" + e.getMessage());
+            fail("Failure to register consumer:" + e.getMessage());
         }
 
         // put test messages into a queue
         putGivenNumberOfMessages(queue, 4);
 
         // assert received messages
-        List<MessageInstance> messages = subscription.getMessages();
+        List<MessageInstance> messages = consumer.getMessages();
         assertEquals("Only 2 messages should be returned", 2, messages.size());
         assertEquals("ID of first message should be 1", 1l,
                 (messages.get(0).getMessage()).getMessageNumber());
@@ -1048,52 +1068,60 @@ public class SimpleAMQQueueTest extends 
         final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
                 "testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
 
-        //verify adding an active subscription increases the count
-        final MockSubscription subscription1 = new MockSubscription();
-        subscription1.setActive(true);
-        subscription1.setState(SubscriptionTarget.State.ACTIVE);
+        //verify adding an active consumer increases the count
+        final MockConsumer consumer1 = new MockConsumer();
+        consumer1.setActive(true);
+        consumer1.setState(ConsumerTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
-        queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+        queue.addConsumer(consumer1,
+                          null,
+                          createMessage(-1l).getClass(),
+                          "test",
+                          EnumSet.noneOf(Consumer.Option.class));
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
-        //verify adding an inactive subscription doesn't increase the count
-        final MockSubscription subscription2 = new MockSubscription();
-        subscription2.setActive(false);
-        subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+        //verify adding an inactive consumer doesn't increase the count
+        final MockConsumer consumer2 = new MockConsumer();
+        consumer2.setActive(false);
+        consumer2.setState(ConsumerTarget.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
-        queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+        queue.addConsumer(consumer2,
+                          null,
+                          createMessage(-1l).getClass(),
+                          "test",
+                          EnumSet.noneOf(Consumer.Option.class));
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
         //verify behaviour in face of expected state changes:
 
-        //verify a subscription going suspended->active increases the count
-        subscription2.setState(SubscriptionTarget.State.ACTIVE);
+        //verify a consumer going suspended->active increases the count
+        consumer2.setState(ConsumerTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
 
-        //verify a subscription going active->suspended decreases the count
-        subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+        //verify a consumer going active->suspended decreases the count
+        consumer2.setState(ConsumerTarget.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
-        //verify a subscription going suspended->closed doesn't change the count
-        subscription2.setState(SubscriptionTarget.State.CLOSED);
+        //verify a consumer going suspended->closed doesn't change the count
+        consumer2.setState(ConsumerTarget.State.CLOSED);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
-        //verify a subscription going active->active doesn't change the count
-        subscription1.setState(SubscriptionTarget.State.ACTIVE);
+        //verify a consumer going active->active doesn't change the count
+        consumer1.setState(ConsumerTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
-        subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+        consumer1.setState(ConsumerTarget.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
 
-        //verify a subscription going suspended->suspended doesn't change the count
-        subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+        //verify a consumer going suspended->suspended doesn't change the count
+        consumer1.setState(ConsumerTarget.State.SUSPENDED);
         assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
 
-        subscription1.setState(SubscriptionTarget.State.ACTIVE);
+        consumer1.setState(ConsumerTarget.State.ACTIVE);
         assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
 
-        //verify a subscription going active->closed  decreases the count
-        subscription1.setState(SubscriptionTarget.State.CLOSED);
+        //verify a consumer going active->closed  decreases the count
+        consumer1.setState(ConsumerTarget.State.CLOSED);
         assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
 
     }
@@ -1248,9 +1276,9 @@ public class SimpleAMQQueueTest extends 
         return _queue;
     }
 
-    public MockSubscription getSubscription()
+    public MockConsumer getConsumer()
     {
-        return _subscriptionTarget;
+        return _consumerTarget;
     }
 
     public Map<String,Object> getArguments()

Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (from r1564601, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java&r1=1564601&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Feb  5 10:29:55 2014
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -34,8 +33,8 @@ import org.apache.qpid.server.protocol.M
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
@@ -46,7 +45,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
 {
 
     private static final Option[] BATCHED = new Option[] { Option.BATCH };
@@ -69,16 +68,16 @@ public class SubscriptionTarget_0_10 ext
     private final Map<String, Object> _arguments;
     private int _deferredMessageCredit;
     private long _deferredSizeCredit;
-    private Subscription _subscription;
+    private Consumer _consumer;
 
 
-    public SubscriptionTarget_0_10(ServerSession session,
-                                   String name,
-                                   MessageAcceptMode acceptMode,
-                                   MessageAcquireMode acquireMode,
-                                   MessageFlowMode flowMode,
-                                   FlowCreditManager_0_10 creditManager,
-                                   Map<String, Object> arguments)
+    public ConsumerTarget_0_10(ServerSession session,
+                               String name,
+                               MessageAcceptMode acceptMode,
+                               MessageAcquireMode acquireMode,
+                               MessageFlowMode flowMode,
+                               FlowCreditManager_0_10 creditManager,
+                               Map<String, Object> arguments)
     {
         super(State.SUSPENDED);
         _session = session;
@@ -93,9 +92,9 @@ public class SubscriptionTarget_0_10 ext
         _name = name;
     }
 
-    public Subscription getSubscription()
+    public Consumer getConsumer()
     {
-        return _subscription;
+        return _consumer;
     }
 
     public boolean isSuspended()
@@ -108,7 +107,7 @@ public class SubscriptionTarget_0_10 ext
         boolean closed = false;
         State state = getState();
 
-        getSubscription().getSendLock();
+        getConsumer().getSendLock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -124,7 +123,7 @@ public class SubscriptionTarget_0_10 ext
             }
         finally
         {
-            getSubscription().releaseSendLock();
+            getConsumer().releaseSendLock();
         }
 
         return closed;
@@ -255,8 +254,8 @@ public class SubscriptionTarget_0_10 ext
         Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
 
 
-        xfr = batch ? new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
-                    : new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+        xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+                    : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody());
 
         if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
         {
@@ -353,7 +352,7 @@ public class SubscriptionTarget_0_10 ext
     {
         entry.setRedelivered();
         entry.routeToAlternate(null, null);
-        if(entry.isAcquiredBy(getSubscription()))
+        if(entry.isAcquiredBy(getConsumer()))
         {
             entry.delete();
         }
@@ -450,7 +449,7 @@ public class SubscriptionTarget_0_10 ext
     {
         try
         {
-            getSubscription().getSendLock();
+            getConsumer().getSendLock();
 
             updateState(State.ACTIVE, State.SUSPENDED);
             _stopped.set(true);
@@ -459,7 +458,7 @@ public class SubscriptionTarget_0_10 ext
         }
         finally
         {
-            getSubscription().releaseSendLock();
+            getConsumer().releaseSendLock();
         }
     }
 
@@ -519,7 +518,7 @@ public class SubscriptionTarget_0_10 ext
     public void acknowledge(MessageInstance entry)
     {
         // TODO Fix Store Context / cleanup
-        if(entry.isAcquiredBy(getSubscription()))
+        if(entry.isAcquiredBy(getConsumer()))
         {
             _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
             _unacknowledgedCount.decrementAndGet();
@@ -530,7 +529,7 @@ public class SubscriptionTarget_0_10 ext
     public void flush() throws AMQException
     {
         flushCreditState(true);
-        getSubscription().flush();
+        getConsumer().flush();
         stop();
     }
 
@@ -560,13 +559,13 @@ public class SubscriptionTarget_0_10 ext
 
 
     @Override
-    public void subscriptionRegistered(final Subscription sub)
+    public void consumerAdded(final Consumer sub)
     {
-        _subscription = sub;
+        _consumer = sub;
     }
 
     @Override
-    public void subscriptionRemoved(final Subscription sub)
+    public void consumerRemoved(final Consumer sub)
     {
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Wed Feb  5 10:29:55 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
 
 
 class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -32,9 +31,9 @@ class ExplicitAcceptDispositionChangeLis
 
 
     private final MessageInstance _entry;
-    private final SubscriptionTarget_0_10 _target;
+    private final ConsumerTarget_0_10 _target;
 
-    public ExplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
+    public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
     {
         _entry = entry;
         _target = target;
@@ -42,7 +41,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onAccept()
     {
-        if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
         {
             _target.getSessionModel().acknowledge(_target, _entry);
         }
@@ -55,7 +54,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
         {
             _target.release(_entry, setRedelivered);
         }
@@ -67,7 +66,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+        if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
         {
             _target.reject(_entry);
         }
@@ -80,7 +79,7 @@ class ExplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        return _entry.acquire(_target.getSubscription());
+        return _entry.acquire(_target.getConsumer());
     }
 
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Wed Feb  5 10:29:55 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
 
 class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
 {
@@ -31,9 +30,9 @@ class ImplicitAcceptDispositionChangeLis
 
 
     private final MessageInstance _entry;
-    private SubscriptionTarget_0_10 _target;
+    private ConsumerTarget_0_10 _target;
 
-    public ImplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
+    public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
     {
         _entry = entry;
         _target = target;
@@ -46,7 +45,7 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onRelease(boolean setRedelivered)
     {
-        if(_entry.isAcquiredBy(_target.getSubscription()))
+        if(_entry.isAcquiredBy(_target.getConsumer()))
         {
             _target.release(_entry, setRedelivered);
         }
@@ -58,7 +57,7 @@ class ImplicitAcceptDispositionChangeLis
 
     public void onReject()
     {
-        if(_entry.isAcquiredBy(_target.getSubscription()))
+        if(_entry.isAcquiredBy(_target.getConsumer()))
         {
             _target.reject(_entry);
         }
@@ -71,7 +70,7 @@ class ImplicitAcceptDispositionChangeLis
 
     public boolean acquire()
     {
-        boolean acquired = _entry.acquire(_target.getSubscription());
+        boolean acquired = _entry.acquire(_target.getConsumer());
         if(acquired)
         {
             _target.recordUnacknowledged(_entry);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Wed Feb  5 10:29:55 2014
@@ -22,17 +22,16 @@
 package org.apache.qpid.server.protocol.v0_10;
 
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.transport.Method;
 
 public class MessageAcceptCompletionListener implements Method.CompletionListener
 {
-    private final SubscriptionTarget_0_10 _sub;
+    private final ConsumerTarget_0_10 _sub;
     private final MessageInstance _entry;
     private final ServerSession _session;
     private boolean _restoreCredit;
 
-    public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
+    public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
     {
         super();
         _sub = sub;
@@ -47,7 +46,7 @@ public class MessageAcceptCompletionList
         {
             _sub.restoreCredit(_entry.getMessage());
         }
-        if(_entry.isAcquiredBy(_sub.getSubscription()))
+        if(_entry.isAcquiredBy(_sub.getConsumer()))
         {
             _session.acknowledge(_sub, _entry);
         }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Wed Feb  5 10:29:55 2014
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate ex
     private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
     {
         final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
-        final Collection<SubscriptionTarget_0_10> subs = ssn.getSubscriptions();
-        for (SubscriptionTarget_0_10 subscription_0_10 : subs)
+        final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subs)
         {
             subscription_0_10.stop();
         }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Feb  5 10:29:55 2014
@@ -63,7 +63,6 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.DistributedTransaction;
@@ -137,7 +136,7 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
 
-    private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>();
+    private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
 
     private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
 
@@ -400,7 +399,7 @@ public class ServerSession extends Sessi
         // Broker shouldn't block awaiting close - thus do override this method to do nothing
     }
 
-    public void acknowledge(final SubscriptionTarget_0_10 sub, final MessageInstance entry)
+    public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
     {
         _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
                              new ServerTransaction.Action()
@@ -421,22 +420,22 @@ public class ServerSession extends Sessi
                              });
     }
 
-    public Collection<SubscriptionTarget_0_10> getSubscriptions()
+    public Collection<ConsumerTarget_0_10> getSubscriptions()
     {
         return _subscriptions.values();
     }
 
-    public void register(String destination, SubscriptionTarget_0_10 sub)
+    public void register(String destination, ConsumerTarget_0_10 sub)
     {
         _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
     }
 
-    public SubscriptionTarget_0_10 getSubscription(String destination)
+    public ConsumerTarget_0_10 getSubscription(String destination)
     {
         return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
     }
 
-    public void unregister(SubscriptionTarget_0_10 sub)
+    public void unregister(ConsumerTarget_0_10 sub)
     {
         _subscriptions.remove(sub.getName());
         sub.close();
@@ -808,8 +807,8 @@ public class ServerSession extends Sessi
 
     void unregisterSubscriptions()
     {
-        final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
-        for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
         {
             unregister(subscription_0_10);
         }
@@ -817,8 +816,8 @@ public class ServerSession extends Sessi
 
     void stopSubscriptions()
     {
-        final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
-        for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
         {
             subscription_0_10.stop();
         }
@@ -827,8 +826,8 @@ public class ServerSession extends Sessi
 
     public void receivedComplete()
     {
-        final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
-        for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+        final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
         {
             subscription_0_10.flushCreditState(false);
         }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Feb  5 10:29:55 2014
@@ -46,7 +46,7 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.DtxNotSelectedException;
 import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -257,7 +257,7 @@ public class ServerSessionDelegate exten
                         return;
                     }
 
-                    SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination,
+                    ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
                                                                                  method.getAcceptMode(),
                                                                                  method.getAcquireMode(),
                                                                                  MessageFlowMode.WINDOW,
@@ -268,31 +268,31 @@ public class ServerSessionDelegate exten
                     ((ServerSession)session).register(destination, target);
                     try
                     {
-                        EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+                        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
                         if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
                         {
-                            options.add(Subscription.Option.ACQUIRES);
+                            options.add(Consumer.Option.ACQUIRES);
                         }
                         if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                         {
-                            options.add(Subscription.Option.SEES_REQUEUES);
+                            options.add(Consumer.Option.SEES_REQUEUES);
                         }
                         if(method.getExclusive())
                         {
-                            options.add(Subscription.Option.EXCLUSIVE);
+                            options.add(Consumer.Option.EXCLUSIVE);
                         }
-                        Subscription sub =
-                                queue.registerSubscription(target,
-                                                           filterManager,
-                                                           MessageTransferMessage.class,
-                                                           destination,
-                                                           options);
+                        Consumer sub =
+                                queue.addConsumer(target,
+                                                  filterManager,
+                                                  MessageTransferMessage.class,
+                                                  destination,
+                                                  options);
                     }
-                    catch (AMQQueue.ExistingExclusiveSubscription existing)
+                    catch (AMQQueue.ExistingExclusiveConsumer existing)
                     {
                         exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
                     }
-                    catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+                    catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive)
                     {
                         exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
                     }
@@ -405,7 +405,7 @@ public class ServerSessionDelegate exten
     {
         String destination = method.getDestination();
 
-        SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -422,7 +422,7 @@ public class ServerSessionDelegate exten
     {
         String destination = method.getDestination();
 
-        SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -1476,7 +1476,7 @@ public class ServerSessionDelegate exten
     {
         String destination = sfm.getDestination();
 
-        SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -1493,7 +1493,7 @@ public class ServerSessionDelegate exten
     {
         String destination = stop.getDestination();
 
-        SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {
@@ -1511,7 +1511,7 @@ public class ServerSessionDelegate exten
     {
         String destination = flow.getDestination();
 
-        SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+        ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
 
         if(sub == null)
         {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Feb  5 10:29:55 2014
@@ -69,8 +69,7 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -113,7 +112,7 @@ public class AMQChannel implements AMQSe
     private IncomingMessage _currentMessage;
 
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
-    private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>();
+    private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
 
     private final MessageStore _messageStore;
 
@@ -488,10 +487,10 @@ public class AMQChannel implements AMQSe
     }
 
 
-    public Subscription getSubscription(AMQShortString tag)
+    public Consumer getSubscription(AMQShortString tag)
     {
-        final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
-        return target == null ? null : target.getSubscription();
+        final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+        return target == null ? null : target.getConsumer();
     }
 
     /**
@@ -522,30 +521,30 @@ public class AMQChannel implements AMQSe
             throw new AMQException("Consumer already exists with same tag: " + tag);
         }
 
-        SubscriptionTarget_0_8 target;
-        EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+        ConsumerTarget_0_8 target;
+        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
 
         if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
         {
-            target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
-            options.add(Subscription.Option.TRANSIENT);
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+            options.add(Consumer.Option.TRANSIENT);
         }
         else if(acks)
         {
-            target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
-            options.add(Subscription.Option.ACQUIRES);
-            options.add(Subscription.Option.SEES_REQUEUES);
+            target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
         }
         else
         {
-            target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
-            options.add(Subscription.Option.ACQUIRES);
-            options.add(Subscription.Option.SEES_REQUEUES);
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
         }
 
         if(exclusive)
         {
-            options.add(Subscription.Option.EXCLUSIVE);
+            options.add(Consumer.Option.EXCLUSIVE);
         }
 
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
@@ -557,8 +556,12 @@ public class AMQChannel implements AMQSe
 
         try
         {
-            Subscription sub =
-                    queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options);
+            Consumer sub =
+                    queue.addConsumer(target,
+                                      FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+                                      AMQMessage.class,
+                                      AMQShortString.toString(tag),
+                                      options);
         }
         catch (AMQException e)
         {
@@ -582,8 +585,8 @@ public class AMQChannel implements AMQSe
     public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
     {
 
-        SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
-        Subscription sub = target == null ? null : target.getSubscription();
+        ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+        Consumer sub = target == null ? null : target.getConsumer();
         if (sub != null)
         {
             sub.close();
@@ -651,14 +654,14 @@ public class AMQChannel implements AMQSe
             }
         }
 
-        for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
+        for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
         {
             if (_logger.isInfoEnabled())
             {
                 _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
-            Subscription sub = me.getValue().getSubscription();
+            Consumer sub = me.getValue().getConsumer();
 
 
             sub.close();
@@ -674,14 +677,14 @@ public class AMQChannel implements AMQSe
      * @param entry       the record of the message on the queue that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
-     * @param subscription The consumer that is to acknowledge this message.
+     * @param consumer The consumer that is to acknowledge this message.
      */
-    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Subscription subscription)
+    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
     {
         if (_logger.isDebugEnabled())
         {
                     _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
-                               + ") for " + subscription);
+                               + ") for " + consumer);
 
         }
 
@@ -928,9 +931,9 @@ public class AMQChannel implements AMQSe
             if (wasSuspended)
             {
                 // may need to deliver queued messages
-                for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
+                for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
-                    s.getSubscription().externalStateChange();
+                    s.getConsumer().externalStateChange();
                 }
             }
 
@@ -944,15 +947,15 @@ public class AMQChannel implements AMQSe
             if (!wasSuspended)
             {
                 // may need to deliver queued messages
-                for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
+                for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
                     try
                     {
-                        s.getSubscription().getSendLock();
+                        s.getConsumer().getSendLock();
                     }
                     finally
                     {
-                        s.getSubscription().releaseSendLock();
+                        s.getConsumer().releaseSendLock();
                     }
                 }
             }
@@ -1029,10 +1032,10 @@ public class AMQChannel implements AMQSe
         boolean requiresSuspend = _suspended.compareAndSet(false,true);
 
         // ensure all subscriptions have seen the change to the channel state
-        for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+        for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
         {
-            sub.getSubscription().getSendLock();
-            sub.getSubscription().releaseSendLock();
+            sub.getConsumer().getSendLock();
+            sub.getConsumer().releaseSendLock();
         }
 
         try
@@ -1052,7 +1055,7 @@ public class AMQChannel implements AMQSe
 
         for(MessageInstance entry : _resendList)
         {
-            Subscription sub = entry.getDeliveredSubscription();
+            Consumer sub = entry.getDeliveredConsumer();
             if(sub == null || sub.isClosed())
             {
                 entry.release();
@@ -1067,9 +1070,9 @@ public class AMQChannel implements AMQSe
         if(requiresSuspend)
         {
             _suspended.set(false);
-            for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+            for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
             {
-                sub.getSubscription().externalStateChange();
+                sub.getConsumer().externalStateChange();
             }
 
         }
@@ -1125,7 +1128,7 @@ public class AMQChannel implements AMQSe
     private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
+            public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
             {
                 addUnacknowledgedMessage(entry, deliveryTag, sub);
             }
@@ -1472,7 +1475,7 @@ public class AMQChannel implements AMQSe
         else
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
-            final Subscription sub = rejectedQueueEntry.getDeliveredSubscription();
+            final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
 
             int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
                 {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Wed Feb  5 10:29:55 2014
@@ -94,7 +94,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
@@ -1668,7 +1668,7 @@ public class AMQProtocolEngine implement
         }
 
         @Override
-        public void deliverToClient(final Subscription sub, final ServerMessage message,
+        public void deliverToClient(final Consumer sub, final ServerMessage message,
                                     final InstanceProperties props, final long deliveryTag)
                 throws AMQException
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org