You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 11:29:57 UTC
svn commit: r1564703 [3/4] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/logging/actors/
broker-core/src/main/java/org/apache/qpid/s...
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Feb 5 10:29:55 2014
@@ -29,8 +29,8 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -207,15 +207,15 @@ public class MockAMQQueue implements AMQ
}
@Override
- public Subscription registerSubscription(final SubscriptionTarget target,
- final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName,
- final EnumSet<Subscription.Option> options) throws AMQException
- {
- return new QueueSubscription(filters, messageClass, options.contains(Subscription.Option.ACQUIRES),
- options.contains(Subscription.Option.SEES_REQUEUES), consumerName,
- options.contains(Subscription.Option.TRANSIENT), target );
+ public Consumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ final EnumSet<Consumer.Option> options) throws AMQException
+ {
+ return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES),
+ options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
+ options.contains(Consumer.Option.TRANSIENT), target );
}
public String getName()
@@ -224,22 +224,18 @@ public class MockAMQQueue implements AMQ
}
- public void unregisterSubscription(Subscription subscription) throws AMQException
- {
-
- }
- public Collection<Subscription> getConsumers()
+ public Collection<Consumer> getConsumers()
{
return Collections.emptyList();
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
}
@@ -254,7 +250,7 @@ public class MockAMQQueue implements AMQ
return 0;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return false;
}
@@ -318,11 +314,11 @@ public class MockAMQQueue implements AMQ
{
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ public void dequeue(QueueEntry entry, Consumer sub)
{
}
- public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
+ public boolean resend(QueueEntry entry, Consumer consumer) throws AMQException
{
return false;
}
@@ -431,12 +427,12 @@ public class MockAMQQueue implements AMQ
return null;
}
- public void flushSubscription(Subscription sub) throws AMQException
+ public void flushConsumer(Consumer sub) throws AMQException
{
}
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(Consumer sub)
{
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Wed Feb 5 10:29:55 2014
@@ -26,7 +26,7 @@ import org.apache.qpid.server.message.AM
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
@@ -41,7 +41,7 @@ public class MockQueueEntry implements Q
return false;
}
- public boolean acquire(Subscription sub)
+ public boolean acquire(Consumer sub)
{
return false;
}
@@ -52,12 +52,12 @@ public class MockQueueEntry implements Q
return 0;
}
- public boolean acquiredBySubscription()
+ public boolean acquiredByConsumer()
{
return false;
}
- public boolean isAcquiredBy(Subscription subscription)
+ public boolean isAcquiredBy(Consumer consumer)
{
return false;
}
@@ -87,7 +87,7 @@ public class MockQueueEntry implements Q
return false;
}
- public Subscription getDeliveredSubscription()
+ public Consumer getDeliveredConsumer()
{
return null;
}
@@ -125,7 +125,7 @@ public class MockQueueEntry implements Q
}
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(Consumer consumer)
{
return false;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Wed Feb 5 10:29:55 2014
@@ -21,10 +21,11 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import java.lang.reflect.Field;
@@ -112,17 +113,17 @@ public abstract class QueueEntryImplTest
*/
private void acquire()
{
- _queueEntry.acquire(newMockSubscription());
+ _queueEntry.acquire(newMockConsumer());
assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
_queueEntry.isAcquired());
}
- private Subscription newMockSubscription()
+ private Consumer newMockConsumer()
{
- final Subscription subscription = mock(Subscription.class);
- when(subscription.getOwningState()).thenReturn(new QueueEntry.SubscriptionAcquiredState(subscription));
- when(subscription.getSubscriptionID()).thenReturn(Subscription.SUB_ID_GENERATOR.getAndIncrement());
- return subscription;
+ final Consumer consumer = mock(Consumer.class);
+ when(consumer.getOwningState()).thenReturn(new MessageInstance.ConsumerAcquiredState(consumer));
+ when(consumer.getId()).thenReturn(Consumer.SUB_ID_GENERATOR.getAndIncrement());
+ return consumer;
}
/**
@@ -147,34 +148,34 @@ public abstract class QueueEntryImplTest
}
/**
- * Tests rejecting a queue entry records the Subscription ID
- * for later verification by isRejectedBy(subscriptionId).
+ * Tests rejecting a queue entry records the Consumer ID
+ * for later verification by isRejectedBy(consumerId).
*/
public void testRejectAndRejectedBy()
{
- Subscription sub = newMockSubscription();
+ Consumer sub = newMockConsumer();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
- assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
+ assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+ assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
- //acquire, reject, and release the message using the subscription
+ //acquire, reject, and release the message using the consumer
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
_queueEntry.reject();
_queueEntry.release();
//verify the rejection is recorded
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
- //repeat rejection using a second subscription
- Subscription sub2 = newMockSubscription();
+ //repeat rejection using a second consumer
+ Consumer sub2 = newMockConsumer();
- assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
+ assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
_queueEntry.reject();
- //verify it still records being rejected by both subscriptions
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub));
- assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2));
+ //verify it still records being rejected by both consumers
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+ assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
}
/**
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Wed Feb 5 10:29:55 2014
@@ -45,9 +45,9 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.MockConsumer;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -69,8 +69,8 @@ public class SimpleAMQQueueTest extends
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
- private MockSubscription _subscriptionTarget = new MockSubscription();
- private QueueSubscription _subscription;
+ private MockConsumer _consumerTarget = new MockConsumer();
+ private QueueConsumer _consumer;
private Map<String,Object> _arguments = null;
@Override
@@ -162,13 +162,13 @@ public class SimpleAMQQueueTest extends
}
- public void testRegisterSubscriptionThenEnqueueMessage() throws AMQException
+ public void testRegisterConsumerThenEnqueueMessage() throws AMQException
{
ServerMessage messageA = createMessage(new Long(24));
- // Check adding a subscription adds it to the queue
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ // Check adding a consumer adds it to the queue
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
@@ -183,49 +183,49 @@ public class SimpleAMQQueueTest extends
catch(InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull(_subscription.getQueueContext().getReleasedEntry());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
+ assertNull(_consumer.getQueueContext().getReleasedEntry());
- // Check removing the subscription removes it's information from the queue
- _subscription.close();
- assertTrue("Subscription still had queue", _subscriptionTarget.isClosed());
+ // Check removing the consumer removes it's information from the queue
+ _consumer.close();
+ assertTrue("Consumer still had queue", _consumerTarget.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
assertFalse("Queue still has active consumer",
1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB);
- assertNull(_subscription.getQueueContext());
+ assertNull(_consumer.getQueueContext());
}
- public void testEnqueueMessageThenRegisterSubscription() throws AMQException, InterruptedException
+ public void testEnqueueMessageThenRegisterConsumer() throws AMQException, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests enqueuing two messages.
*/
- public void testEnqueueTwoMessagesThenRegisterSubscription() throws Exception
+ public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageA);
_queue.enqueue(messageB);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -240,9 +240,9 @@ public class SimpleAMQQueueTest extends
ServerMessage messageC = createMessage(new Long(26));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -261,7 +261,9 @@ public class SimpleAMQQueueTest extends
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -272,12 +274,14 @@ public class SimpleAMQQueueTest extends
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 4, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 4,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
@@ -289,9 +293,9 @@ public class SimpleAMQQueueTest extends
{
ServerMessage messageA = createMessage(new Long(24));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.SEES_REQUEUES,
- Subscription.Option.ACQUIRES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.SEES_REQUEUES,
+ Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -313,7 +317,9 @@ public class SimpleAMQQueueTest extends
int subFlushWaitTime = 150;
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 1, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
@@ -323,10 +329,12 @@ public class SimpleAMQQueueTest extends
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
- assertEquals("Total number of messages sent should not have changed", 1, _subscriptionTarget.getMessages().size());
+ assertEquals("Total number of messages sent should not have changed",
+ 1,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
@@ -343,9 +351,9 @@ public class SimpleAMQQueueTest extends
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
@@ -364,7 +372,9 @@ public class SimpleAMQQueueTest extends
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 3, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 3,
+ _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
@@ -376,35 +386,37 @@ public class SimpleAMQQueueTest extends
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to subscription", 5, _subscriptionTarget.getMessages().size());
+ assertEquals("Unexpected total number of messages sent to consumer",
+ 5,
+ _consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
- _subscription.getQueueContext().getReleasedEntry());
+ _consumer.getQueueContext().getReleasedEntry());
}
/**
- * Tests that a release requeues an entry for a queue with multiple subscriptions. Verifies that a
+ * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a
* requeue resends a message to a <i>single</i> subscriber.
*/
- public void testReleaseForQueueWithMultipleSubscriptions() throws Exception
+ public void testReleaseForQueueWithMultipleConsumers() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
- MockSubscription target1 = new MockSubscription();
- MockSubscription target2 = new MockSubscription();
+ MockConsumer target1 = new MockConsumer();
+ MockConsumer target2 = new MockConsumer();
- QueueSubscription subscription1 = _queue.registerSubscription(target1, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
- QueueSubscription subscription2 = _queue.registerSubscription(target2, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES,
- Subscription.Option.SEES_REQUEUES));
+ QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
@@ -433,22 +445,22 @@ public class SimpleAMQQueueTest extends
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
- assertEquals("Unexpected total number of messages sent to both subscriptions after release",
+ assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
target1.getMessages().size() + target2.getMessages().size());
assertNull("releasedEntry should be cleared after requeue processed",
- subscription1.getQueueContext().getReleasedEntry());
+ consumer1.getQueueContext().getReleasedEntry());
assertNull("releasedEntry should be cleared after requeue processed",
- subscription2.getQueueContext().getReleasedEntry());
+ consumer2.getQueueContext().getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
{
ServerMessage messageA = createMessage(new Long(24));
- // Check adding an exclusive subscription adds it to the queue
+ // Check adding an exclusive consumer adds it to the queue
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.EXCLUSIVE));
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
@@ -464,16 +476,16 @@ public class SimpleAMQQueueTest extends
catch (InterruptedException e)
{
}
- assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
+ assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
- MockSubscription subB = new MockSubscription();
+ MockConsumer subB = new MockConsumer();
Exception ex = null;
try
{
- _queue.registerSubscription(subB, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
}
catch (AMQException e)
@@ -483,16 +495,16 @@ public class SimpleAMQQueueTest extends
assertNotNull(ex);
// Check we cannot add an exclusive subscriber to a queue with an
- // existing subscription
- _subscription.close();
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, messageA.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ // existing consumer
+ _consumer.close();
+ _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
try
{
- _subscription = _queue.registerSubscription(subB, null, messageA.getClass(), "test",
- EnumSet.of(Subscription.Option.EXCLUSIVE));
+ _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test",
+ EnumSet.of(Consumer.Option.EXCLUSIVE));
}
catch (AMQException e)
@@ -509,12 +521,12 @@ public class SimpleAMQQueueTest extends
_queue.setDeleteOnNoConsumers(true);
ServerMessage message = createMessage(new Long(25));
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
_queue.enqueue(message);
- _subscription.close();
- assertTrue("Queue was not deleted when subscription was removed",
+ _consumer.close();
+ assertTrue("Queue was not deleted when consumer was removed",
_queue.isDeleted());
}
@@ -523,13 +535,13 @@ public class SimpleAMQQueueTest extends
Long id = new Long(26);
ServerMessage message = createMessage(id);
- _subscription = _queue.registerSubscription(_subscriptionTarget, null, message.getClass(), "test",
- EnumSet.noneOf(Subscription.Option.class));
+ _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
+ EnumSet.noneOf(Consumer.Option.class));
_queue.enqueue(message);
- QueueEntry entry = _subscription.getQueueContext().getLastSeenEntry();
+ QueueEntry entry = _consumer.getQueueContext().getLastSeenEntry();
entry.setRedelivered();
- _subscription.resend(entry);
+ _consumer.resend(entry);
}
@@ -656,19 +668,19 @@ public class SimpleAMQQueueTest extends
/**
* processQueue() is used when asynchronously delivering messages to
- * subscriptions which could not be delivered immediately during the
+ * consumers which could not be delivered immediately during the
* enqueue() operation.
*
* A defect within the method would mean that delivery of these messages may
* not occur should the Runner stop before all messages have been processed.
* Such a defect was discovered when Selectors were used such that one and
- * only one subscription can/will accept any given messages, but multiple
- * subscriptions are present, and one of the earlier subscriptions receives
+ * only one consumer can/will accept any given messages, but multiple
+ * consumers are present, and one of the earlier consumers receives
* more messages than the others.
*
* This test is to validate that the processQueue() method is able to
* correctly deliver all of the messages present for asynchronous delivery
- * to subscriptions in such a scenario.
+ * to consumers in such a scenario.
*/
public void testProcessQueueWithUniqueSelectors() throws Exception
{
@@ -677,10 +689,10 @@ public class SimpleAMQQueueTest extends
false, false, _virtualHost, factory, null)
{
@Override
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing, i.e prevent deliveries by the SubFlushRunner
- // when registering the new subscriptions
+ // when registering the new consumers
}
};
@@ -696,28 +708,28 @@ public class SimpleAMQQueueTest extends
QueueEntry msg4 = list.add(createMessage(4L));
QueueEntry msg5 = list.add(createMessage(5L));
- // Create lists of the entries each subscription should be interested
- // in.Bias over 50% of the messages to the first subscription so that
- // the later subscriptions reject them and report being done before
- // the first subscription as the processQueue method proceeds.
+ // Create lists of the entries each consumer should be interested
+ // in.Bias over 50% of the messages to the first consumer so that
+ // the later consumers reject them and report being done before
+ // the first consumer as the processQueue method proceeds.
List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
List<String> msgListSub2 = createEntriesList(msg4);
List<String> msgListSub3 = createEntriesList(msg5);
- MockSubscription sub1 = new MockSubscription(msgListSub1);
- MockSubscription sub2 = new MockSubscription(msgListSub2);
- MockSubscription sub3 = new MockSubscription(msgListSub3);
-
- // register the subscriptions
- testQueue.registerSubscription(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
- testQueue.registerSubscription(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
- testQueue.registerSubscription(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
- EnumSet.of(Subscription.Option.ACQUIRES, Subscription.Option.SEES_REQUEUES));
+ MockConsumer sub1 = new MockConsumer(msgListSub1);
+ MockConsumer sub2 = new MockConsumer(msgListSub2);
+ MockConsumer sub3 = new MockConsumer(msgListSub3);
+
+ // register the consumers
+ testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
+ testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
+ EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
//check that no messages have been delivered to the
- //subscriptions during registration
+ //consumers during registration
assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
@@ -904,7 +916,7 @@ public class SimpleAMQQueueTest extends
false, "testOwner", false, false, _virtualHost, null)
{
@Override
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
// do nothing
}
@@ -919,8 +931,8 @@ public class SimpleAMQQueueTest extends
// latch to wait for message receipt
final CountDownLatch latch = new CountDownLatch(messageNumber -1);
- // create a subscription
- MockSubscription subscription = new MockSubscription()
+ // create a consumer
+ MockConsumer consumer = new MockConsumer()
{
/**
* Send a message and decrement latch
@@ -937,7 +949,11 @@ public class SimpleAMQQueueTest extends
try
{
// subscribe
- testQueue.registerSubscription(subscription, null, entries.get(0).getMessage().getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ testQueue.addConsumer(consumer,
+ null,
+ entries.get(0).getMessage().getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
// process queue
testQueue.processQueue(new QueueRunner(testQueue)
@@ -962,11 +978,11 @@ public class SimpleAMQQueueTest extends
Thread.currentThread().interrupt();
}
List<MessageInstance> expected = Arrays.asList((MessageInstance)entries.get(0), entries.get(2), entries.get(3));
- verifyReceivedMessages(expected, subscription.getMessages());
+ verifyReceivedMessages(expected, consumer.getMessages());
}
/**
- * Tests that entry in dequeued state are not enqueued and not delivered to subscription
+ * Tests that entry in dequeued state are not enqueued and not delivered to consumer
*/
public void testEnqueueDequeuedEntry()
{
@@ -1002,7 +1018,7 @@ public class SimpleAMQQueueTest extends
}
@Override
- public boolean acquire(Subscription sub)
+ public boolean acquire(Consumer sub)
{
if(message.getMessageNumber() % 2 == 0)
{
@@ -1018,24 +1034,28 @@ public class SimpleAMQQueueTest extends
};
}
}, null);
- // create a subscription
- MockSubscription subscription = new MockSubscription();
+ // create a consumer
+ MockConsumer consumer = new MockConsumer();
- // register subscription
+ // register consumer
try
{
- queue.registerSubscription(subscription, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
}
catch (AMQException e)
{
- fail("Failure to register subscription:" + e.getMessage());
+ fail("Failure to register consumer:" + e.getMessage());
}
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
// assert received messages
- List<MessageInstance> messages = subscription.getMessages();
+ List<MessageInstance> messages = consumer.getMessages();
assertEquals("Only 2 messages should be returned", 2, messages.size());
assertEquals("ID of first message should be 1", 1l,
(messages.get(0).getMessage()).getMessageNumber());
@@ -1048,52 +1068,60 @@ public class SimpleAMQQueueTest extends
final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateRandomUUID(), "testActiveConsumerCount", false,
"testOwner", false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
- //verify adding an active subscription increases the count
- final MockSubscription subscription1 = new MockSubscription();
- subscription1.setActive(true);
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ //verify adding an active consumer increases the count
+ final MockConsumer consumer1 = new MockConsumer();
+ consumer1.setActive(true);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription1, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer1,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify adding an inactive subscription doesn't increase the count
- final MockSubscription subscription2 = new MockSubscription();
- subscription2.setActive(false);
- subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify adding an inactive consumer doesn't increase the count
+ final MockConsumer consumer2 = new MockConsumer();
+ consumer2.setActive(false);
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- queue.registerSubscription(subscription2, null, createMessage(-1l).getClass(), "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(consumer2,
+ null,
+ createMessage(-1l).getClass(),
+ "test",
+ EnumSet.noneOf(Consumer.Option.class));
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
//verify behaviour in face of expected state changes:
- //verify a subscription going suspended->active increases the count
- subscription2.setState(SubscriptionTarget.State.ACTIVE);
+ //verify a consumer going suspended->active increases the count
+ consumer2.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
- //verify a subscription going active->suspended decreases the count
- subscription2.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify a consumer going active->suspended decreases the count
+ consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going suspended->closed doesn't change the count
- subscription2.setState(SubscriptionTarget.State.CLOSED);
+ //verify a consumer going suspended->closed doesn't change the count
+ consumer2.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going active->active doesn't change the count
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ //verify a consumer going active->active doesn't change the count
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- //verify a subscription going suspended->suspended doesn't change the count
- subscription1.setState(SubscriptionTarget.State.SUSPENDED);
+ //verify a consumer going suspended->suspended doesn't change the count
+ consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
- subscription1.setState(SubscriptionTarget.State.ACTIVE);
+ consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
- //verify a subscription going active->closed decreases the count
- subscription1.setState(SubscriptionTarget.State.CLOSED);
+ //verify a consumer going active->closed decreases the count
+ consumer1.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
}
@@ -1248,9 +1276,9 @@ public class SimpleAMQQueueTest extends
return _queue;
}
- public MockSubscription getSubscription()
+ public MockConsumer getConsumer()
{
- return _subscriptionTarget;
+ return _consumerTarget;
}
public Map<String,Object> getArguments()
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (from r1564601, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java&r1=1564601&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/SubscriptionTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Feb 5 10:29:55 2014
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -34,8 +33,8 @@ import org.apache.qpid.server.protocol.M
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.AbstractSubscriptionTarget;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -46,7 +45,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class SubscriptionTarget_0_10 extends AbstractSubscriptionTarget implements FlowCreditManager.FlowCreditManagerListener
+public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
private static final Option[] BATCHED = new Option[] { Option.BATCH };
@@ -69,16 +68,16 @@ public class SubscriptionTarget_0_10 ext
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
- private Subscription _subscription;
+ private Consumer _consumer;
- public SubscriptionTarget_0_10(ServerSession session,
- String name,
- MessageAcceptMode acceptMode,
- MessageAcquireMode acquireMode,
- MessageFlowMode flowMode,
- FlowCreditManager_0_10 creditManager,
- Map<String, Object> arguments)
+ public ConsumerTarget_0_10(ServerSession session,
+ String name,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ MessageFlowMode flowMode,
+ FlowCreditManager_0_10 creditManager,
+ Map<String, Object> arguments)
{
super(State.SUSPENDED);
_session = session;
@@ -93,9 +92,9 @@ public class SubscriptionTarget_0_10 ext
_name = name;
}
- public Subscription getSubscription()
+ public Consumer getConsumer()
{
- return _subscription;
+ return _consumer;
}
public boolean isSuspended()
@@ -108,7 +107,7 @@ public class SubscriptionTarget_0_10 ext
boolean closed = false;
State state = getState();
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
try
{
while(!closed && state != State.CLOSED)
@@ -124,7 +123,7 @@ public class SubscriptionTarget_0_10 ext
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
return closed;
@@ -255,8 +254,8 @@ public class SubscriptionTarget_0_10 ext
Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- xfr = batch ? new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(getSubscription().getName(),_acceptMode,_acquireMode,header,msg.getBody());
+ xfr = batch ? new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(getConsumer().getName(),_acceptMode,_acquireMode,header,msg.getBody());
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
@@ -353,7 +352,7 @@ public class SubscriptionTarget_0_10 ext
{
entry.setRedelivered();
entry.routeToAlternate(null, null);
- if(entry.isAcquiredBy(getSubscription()))
+ if(entry.isAcquiredBy(getConsumer()))
{
entry.delete();
}
@@ -450,7 +449,7 @@ public class SubscriptionTarget_0_10 ext
{
try
{
- getSubscription().getSendLock();
+ getConsumer().getSendLock();
updateState(State.ACTIVE, State.SUSPENDED);
_stopped.set(true);
@@ -459,7 +458,7 @@ public class SubscriptionTarget_0_10 ext
}
finally
{
- getSubscription().releaseSendLock();
+ getConsumer().releaseSendLock();
}
}
@@ -519,7 +518,7 @@ public class SubscriptionTarget_0_10 ext
public void acknowledge(MessageInstance entry)
{
// TODO Fix Store Context / cleanup
- if(entry.isAcquiredBy(getSubscription()))
+ if(entry.isAcquiredBy(getConsumer()))
{
_unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
_unacknowledgedCount.decrementAndGet();
@@ -530,7 +529,7 @@ public class SubscriptionTarget_0_10 ext
public void flush() throws AMQException
{
flushCreditState(true);
- getSubscription().flush();
+ getConsumer().flush();
stop();
}
@@ -560,13 +559,13 @@ public class SubscriptionTarget_0_10 ext
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
+ _consumer = sub;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Wed Feb 5 10:29:55 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -32,9 +31,9 @@ class ExplicitAcceptDispositionChangeLis
private final MessageInstance _entry;
- private final SubscriptionTarget_0_10 _target;
+ private final ConsumerTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
_target = target;
@@ -42,7 +41,7 @@ class ExplicitAcceptDispositionChangeLis
public void onAccept()
{
- if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
_target.getSessionModel().acknowledge(_target, _entry);
}
@@ -55,7 +54,7 @@ class ExplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
_target.release(_entry, setRedelivered);
}
@@ -67,7 +66,7 @@ class ExplicitAcceptDispositionChangeLis
public void onReject()
{
- if(_target != null && _entry.isAcquiredBy(_target.getSubscription()))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
_target.reject(_entry);
}
@@ -80,7 +79,7 @@ class ExplicitAcceptDispositionChangeLis
public boolean acquire()
{
- return _entry.acquire(_target.getSubscription());
+ return _entry.acquire(_target.getConsumer());
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Wed Feb 5 10:29:55 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
@@ -31,9 +30,9 @@ class ImplicitAcceptDispositionChangeLis
private final MessageInstance _entry;
- private SubscriptionTarget_0_10 _target;
+ private ConsumerTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(MessageInstance entry, SubscriptionTarget_0_10 target)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
_target = target;
@@ -46,7 +45,7 @@ class ImplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_target.getSubscription()))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
_target.release(_entry, setRedelivered);
}
@@ -58,7 +57,7 @@ class ImplicitAcceptDispositionChangeLis
public void onReject()
{
- if(_entry.isAcquiredBy(_target.getSubscription()))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
_target.reject(_entry);
}
@@ -71,7 +70,7 @@ class ImplicitAcceptDispositionChangeLis
public boolean acquire()
{
- boolean acquired = _entry.acquire(_target.getSubscription());
+ boolean acquired = _entry.acquire(_target.getConsumer());
if(acquired)
{
_target.recordUnacknowledged(_entry);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Wed Feb 5 10:29:55 2014
@@ -22,17 +22,16 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
- private final SubscriptionTarget_0_10 _sub;
+ private final ConsumerTarget_0_10 _sub;
private final MessageInstance _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(SubscriptionTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
{
super();
_sub = sub;
@@ -47,7 +46,7 @@ public class MessageAcceptCompletionList
{
_sub.restoreCredit(_entry.getMessage());
}
- if(_entry.isAcquiredBy(_sub.getSubscription()))
+ if(_entry.isAcquiredBy(_sub.getConsumer()))
{
_session.acknowledge(_sub, _entry);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Wed Feb 5 10:29:55 2014
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate ex
private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<SubscriptionTarget_0_10> subs = ssn.getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subs)
+ final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subs)
{
subscription_0_10.stop();
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Feb 5 10:29:55 2014
@@ -63,7 +63,6 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -137,7 +136,7 @@ public class ServerSession extends Sessi
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>();
+ private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
@@ -400,7 +399,7 @@ public class ServerSession extends Sessi
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final SubscriptionTarget_0_10 sub, final MessageInstance entry)
+ public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
{
_transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
new ServerTransaction.Action()
@@ -421,22 +420,22 @@ public class ServerSession extends Sessi
});
}
- public Collection<SubscriptionTarget_0_10> getSubscriptions()
+ public Collection<ConsumerTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
- public void register(String destination, SubscriptionTarget_0_10 sub)
+ public void register(String destination, ConsumerTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
- public SubscriptionTarget_0_10 getSubscription(String destination)
+ public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
- public void unregister(SubscriptionTarget_0_10 sub)
+ public void unregister(ConsumerTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
sub.close();
@@ -808,8 +807,8 @@ public class ServerSession extends Sessi
void unregisterSubscriptions()
{
- final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
@@ -817,8 +816,8 @@ public class ServerSession extends Sessi
void stopSubscriptions()
{
- final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
@@ -827,8 +826,8 @@ public class ServerSession extends Sessi
public void receivedComplete()
{
- final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
- for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Feb 5 10:29:55 2014
@@ -46,7 +46,7 @@ import org.apache.qpid.server.store.Dura
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -257,7 +257,7 @@ public class ServerSessionDelegate exten
return;
}
- SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination,
+ ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
method.getAcceptMode(),
method.getAcquireMode(),
MessageFlowMode.WINDOW,
@@ -268,31 +268,31 @@ public class ServerSessionDelegate exten
((ServerSession)session).register(destination, target);
try
{
- EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
{
- options.add(Subscription.Option.ACQUIRES);
+ options.add(Consumer.Option.ACQUIRES);
}
if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
- options.add(Subscription.Option.SEES_REQUEUES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
if(method.getExclusive())
{
- options.add(Subscription.Option.EXCLUSIVE);
+ options.add(Consumer.Option.EXCLUSIVE);
}
- Subscription sub =
- queue.registerSubscription(target,
- filterManager,
- MessageTransferMessage.class,
- destination,
- options);
+ Consumer sub =
+ queue.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options);
}
- catch (AMQQueue.ExistingExclusiveSubscription existing)
+ catch (AMQQueue.ExistingExclusiveConsumer existing)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
}
@@ -405,7 +405,7 @@ public class ServerSessionDelegate exten
{
String destination = method.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -422,7 +422,7 @@ public class ServerSessionDelegate exten
{
String destination = method.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1476,7 +1476,7 @@ public class ServerSessionDelegate exten
{
String destination = sfm.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1493,7 +1493,7 @@ public class ServerSessionDelegate exten
{
String destination = stop.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1511,7 +1511,7 @@ public class ServerSessionDelegate exten
{
String destination = flow.getDestination();
- SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Feb 5 10:29:55 2014
@@ -69,8 +69,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
@@ -113,7 +112,7 @@ public class AMQChannel implements AMQSe
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, SubscriptionTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, SubscriptionTarget_0_8>();
+ private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
private final MessageStore _messageStore;
@@ -488,10 +487,10 @@ public class AMQChannel implements AMQSe
}
- public Subscription getSubscription(AMQShortString tag)
+ public Consumer getSubscription(AMQShortString tag)
{
- final SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
- return target == null ? null : target.getSubscription();
+ final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+ return target == null ? null : target.getConsumer();
}
/**
@@ -522,30 +521,30 @@ public class AMQChannel implements AMQSe
throw new AMQException("Consumer already exists with same tag: " + tag);
}
- SubscriptionTarget_0_8 target;
- EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class);
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = SubscriptionTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
- options.add(Subscription.Option.TRANSIENT);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.TRANSIENT);
}
else if(acks)
{
- target = SubscriptionTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
else
{
- target = SubscriptionTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
- options.add(Subscription.Option.ACQUIRES);
- options.add(Subscription.Option.SEES_REQUEUES);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
}
if(exclusive)
{
- options.add(Subscription.Option.EXCLUSIVE);
+ options.add(Consumer.Option.EXCLUSIVE);
}
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
@@ -557,8 +556,12 @@ public class AMQChannel implements AMQSe
try
{
- Subscription sub =
- queue.registerSubscription(target, FilterManagerFactory.createManager(FieldTable.convertToMap(filters)), AMQMessage.class, AMQShortString.toString(tag), options);
+ Consumer sub =
+ queue.addConsumer(target,
+ FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
}
catch (AMQException e)
{
@@ -582,8 +585,8 @@ public class AMQChannel implements AMQSe
public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- SubscriptionTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
- Subscription sub = target == null ? null : target.getSubscription();
+ ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+ Consumer sub = target == null ? null : target.getConsumer();
if (sub != null)
{
sub.close();
@@ -651,14 +654,14 @@ public class AMQChannel implements AMQSe
}
}
- for (Map.Entry<AMQShortString, SubscriptionTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
+ for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
if (_logger.isInfoEnabled())
{
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Subscription sub = me.getValue().getSubscription();
+ Consumer sub = me.getValue().getConsumer();
sub.close();
@@ -674,14 +677,14 @@ public class AMQChannel implements AMQSe
* @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
- * @param subscription The consumer that is to acknowledge this message.
+ * @param consumer The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Subscription subscription)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
{
if (_logger.isDebugEnabled())
{
_logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") for " + subscription);
+ + ") for " + consumer);
}
@@ -928,9 +931,9 @@ public class AMQChannel implements AMQSe
if (wasSuspended)
{
// may need to deliver queued messages
- for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getSubscription().externalStateChange();
+ s.getConsumer().externalStateChange();
}
}
@@ -944,15 +947,15 @@ public class AMQChannel implements AMQSe
if (!wasSuspended)
{
// may need to deliver queued messages
- for (SubscriptionTarget_0_8 s : _tag2SubscriptionTargetMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
try
{
- s.getSubscription().getSendLock();
+ s.getConsumer().getSendLock();
}
finally
{
- s.getSubscription().releaseSendLock();
+ s.getConsumer().releaseSendLock();
}
}
}
@@ -1029,10 +1032,10 @@ public class AMQChannel implements AMQSe
boolean requiresSuspend = _suspended.compareAndSet(false,true);
// ensure all subscriptions have seen the change to the channel state
- for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSubscription().getSendLock();
- sub.getSubscription().releaseSendLock();
+ sub.getConsumer().getSendLock();
+ sub.getConsumer().releaseSendLock();
}
try
@@ -1052,7 +1055,7 @@ public class AMQChannel implements AMQSe
for(MessageInstance entry : _resendList)
{
- Subscription sub = entry.getDeliveredSubscription();
+ Consumer sub = entry.getDeliveredConsumer();
if(sub == null || sub.isClosed())
{
entry.release();
@@ -1067,9 +1070,9 @@ public class AMQChannel implements AMQSe
if(requiresSuspend)
{
_suspended.set(false);
- for(SubscriptionTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSubscription().externalStateChange();
+ sub.getConsumer().externalStateChange();
}
}
@@ -1125,7 +1128,7 @@ public class AMQChannel implements AMQSe
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final MessageInstance entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1472,7 +1475,7 @@ public class AMQChannel implements AMQSe
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
- final Subscription sub = rejectedQueueEntry.getDeliveredSubscription();
+ final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Wed Feb 5 10:29:55 2014
@@ -94,7 +94,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -1668,7 +1668,7 @@ public class AMQProtocolEngine implement
}
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message,
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
throws AMQException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org