You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 11:29:57 UTC
svn commit: r1564703 [2/4] - in
/qpid/branches/java-broker-amqp-1-0-management/java:
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/logging/actors/
broker-core/src/main/java/org/apache/qpid/s...
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Feb 5 10:29:55 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
@@ -186,7 +186,7 @@ public abstract class QueueEntryImpl imp
public boolean acquire()
{
- return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+ return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
private boolean acquire(final EntryState state)
@@ -201,7 +201,7 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquire(Subscription sub)
+ public boolean acquire(Consumer sub)
{
final boolean acquired = acquire(sub.getOwningState());
if(acquired)
@@ -211,17 +211,17 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquiredBySubscription()
+ public boolean acquiredByConsumer()
{
- return (_state instanceof SubscriptionAcquiredState);
+ return (_state instanceof ConsumerAcquiredState);
}
- public boolean isAcquiredBy(Subscription subscription)
+ public boolean isAcquiredBy(Consumer consumer)
{
EntryState state = _state;
- return state instanceof SubscriptionAcquiredState
- && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
+ return state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer;
}
public void release()
@@ -231,7 +231,7 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof SubscriptionAcquiredState)
+ if(state instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -263,12 +263,12 @@ public abstract class QueueEntryImpl imp
return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
}
- public Subscription getDeliveredSubscription()
+ public Consumer getDeliveredConsumer()
{
EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
+ if (state instanceof ConsumerAcquiredState)
{
- return ((SubscriptionAcquiredState) state).getSubscription();
+ return ((ConsumerAcquiredState) state).getConsumer();
}
else
{
@@ -278,16 +278,16 @@ public abstract class QueueEntryImpl imp
public void reject()
{
- Subscription subscription = getDeliveredSubscription();
+ Consumer consumer = getDeliveredConsumer();
- if (subscription != null)
+ if (consumer != null)
{
if (_rejectedBy == null)
{
_rejectedBy = new HashSet<Long>();
}
- _rejectedBy.add(subscription.getSubscriptionID());
+ _rejectedBy.add(consumer.getId());
}
else
{
@@ -295,12 +295,12 @@ public abstract class QueueEntryImpl imp
}
}
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(Consumer consumer)
{
- if (_rejectedBy != null) // We have subscriptions that rejected this message
+ if (_rejectedBy != null) // We have consumers that rejected this message
{
- return _rejectedBy.contains(subscription.getSubscriptionID());
+ return _rejectedBy.contains(consumer.getId());
}
else // This message hasn't been rejected yet.
{
@@ -314,8 +314,8 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- Subscription s = null;
- if (state instanceof SubscriptionAcquiredState)
+ Consumer s = null;
+ if (state instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
}
@@ -490,7 +490,7 @@ public abstract class QueueEntryImpl imp
@Override
public boolean resend() throws AMQException
{
- Subscription sub = getDeliveredSubscription();
+ Consumer sub = getDeliveredConsumer();
if(sub != null)
{
return sub.resend(this);
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Wed Feb 5 10:29:55 2014
@@ -32,8 +32,8 @@ import org.apache.qpid.transport.Transpo
/**
* QueueRunners are Runnables used to process a queue when requiring
- * asynchronous message delivery to subscriptions, which is necessary
- * when straight-through delivery of a message to a subscription isn't
+ * asynchronous message delivery to consumers, which is necessary
+ * when straight-through delivery of a message to a consumer isn't
* possible during the enqueue operation.
*/
public class QueueRunner implements Runnable
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Feb 5 10:29:55 2014
@@ -46,8 +46,8 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -56,8 +56,8 @@ import org.apache.qpid.server.util.State
import org.apache.qpid.server.virtualhost.VirtualHost;
public class SimpleAMQQueue implements AMQQueue,
- StateChangeListener<QueueSubscription, Subscription.State>,
- MessageGroupManager.SubscriptionResetHelper
+ StateChangeListener<QueueConsumer, Consumer.State>,
+ MessageGroupManager.ConsumerResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -92,9 +92,9 @@ public class SimpleAMQQueue implements A
private final QueueEntryList<QueueEntry> _entries;
- private final SubscriptionList _subscriptionList = new SubscriptionList();
+ private final QueueConsumerList _consumerList = new QueueConsumerList();
- private volatile QueueSubscription _exclusiveSubscriber;
+ private volatile QueueConsumer _exclusiveSubscriber;
@@ -177,8 +177,8 @@ public class SimpleAMQQueue implements A
private int _maximumDeliveryCount;
private final MessageGroupManager _messageGroupManager;
- private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
- new ArrayList<SubscriptionRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -247,7 +247,7 @@ public class SimpleAMQQueue implements A
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -378,15 +378,15 @@ public class SimpleAMQQueue implements A
return _name;
}
- // ------ Manage Subscriptions
+ // ------ Manage Consumers
@Override
- public QueueSubscription registerSubscription(final SubscriptionTarget target,
- final FilterManager filters,
- final Class<? extends ServerMessage> messageClass,
- final String consumerName,
- EnumSet<Subscription.Option> optionSet) throws AMQException
+ public QueueConsumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<Consumer.Option> optionSet) throws AMQException
{
// Access control
@@ -396,59 +396,59 @@ public class SimpleAMQQueue implements A
}
- if (hasExclusiveSubscriber())
+ if (hasExclusiveConsumer())
{
- throw new ExistingExclusiveSubscription();
+ throw new ExistingExclusiveConsumer();
}
- boolean exclusive = optionSet.contains(Subscription.Option.EXCLUSIVE);
- boolean isTransient = optionSet.contains(Subscription.Option.TRANSIENT);
+ boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
if (exclusive && !isTransient && getConsumerCount() != 0)
{
- throw new ExistingSubscriptionPreventsExclusive();
+ throw new ExistingConsumerPreventsExclusive();
}
- QueueSubscription subscription = new QueueSubscription(filters, messageClass,
- optionSet.contains(Subscription.Option.ACQUIRES),
- optionSet.contains(Subscription.Option.SEES_REQUEUES),
- consumerName, optionSet.contains(Subscription.Option.TRANSIENT), target);
- target.subscriptionRegistered(subscription);
+ QueueConsumer consumer = new QueueConsumer(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ target.consumerAdded(consumer);
if (exclusive && !isTransient)
{
- _exclusiveSubscriber = subscription;
+ _exclusiveSubscriber = consumer;
}
- if(subscription.isActive())
+ if(consumer.isActive())
{
_activeSubscriberCount.incrementAndGet();
}
- subscription.setStateListener(this);
- subscription.setQueueContext(new QueueContext(_entries.getHead()));
+ consumer.setStateListener(this);
+ consumer.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
- subscription.setQueue(this, exclusive);
+ consumer.setQueue(this, exclusive);
if(_nolocal)
{
- subscription.setNoLocal(_nolocal);
+ consumer.setNoLocal(_nolocal);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionRegistered(this, subscription);
+ listener.consumerAdded(this, consumer);
}
}
- _subscriptionList.add(subscription);
+ _consumerList.add(consumer);
- //Increment consumerCountHigh if necessary. (un)registerSubscription are both
+ //Increment consumerCountHigh if necessary. (un)registerConsumer are both
//synchronized methods so we don't need additional synchronization here
if(_consumerCountHigh.get() < getConsumerCount())
{
@@ -457,7 +457,7 @@ public class SimpleAMQQueue implements A
if (isDeleted())
{
- subscription.queueDeleted();
+ consumer.queueDeleted();
}
}
else
@@ -465,27 +465,27 @@ public class SimpleAMQQueue implements A
// TODO
}
- deliverAsync(subscription);
+ deliverAsync(consumer);
- return subscription;
+ return consumer;
}
- synchronized void unregisterSubscription(final QueueSubscription subscription) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
{
- if (subscription == null)
+ if (consumer == null)
{
- throw new NullPointerException("subscription argument is null");
+ throw new NullPointerException("consumer argument is null");
}
- boolean removed = _subscriptionList.remove(subscription);
+ boolean removed = _consumerList.remove(consumer);
if (removed)
{
- subscription.close();
+ consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
- subscription.setQueueContext(null);
+ consumer.setQueueContext(null);
if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
{
@@ -494,20 +494,20 @@ public class SimpleAMQQueue implements A
if(_messageGroupManager != null)
{
- resetSubPointersForGroups(subscription, true);
+ resetSubPointersForGroups(consumer, true);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionUnregistered(this, subscription);
+ listener.consumerRemoved(this, consumer);
}
}
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
+ if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -516,57 +516,57 @@ public class SimpleAMQQueue implements A
getVirtualHost().removeQueue(this);
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted();
+ // we need to manually fire the event to the removed consumer (which was the last one left for this
+ // queue. This is because the delete method uses the consumer set which has just been cleared
+ consumer.queueDeleted();
}
}
}
- public Collection<Subscription> getConsumers()
+ public Collection<Consumer> getConsumers()
{
- List<Subscription> consumers = new ArrayList<Subscription>();
- SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ List<Consumer> consumers = new ArrayList<Consumer>();
+ QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
while(iter.advance())
{
- consumers.add(iter.getNode().getSubscription());
+ consumers.add(iter.getNode().getConsumer());
}
return consumers;
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.add(listener);
+ _consumerListeners.add(listener);
}
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.remove(listener);
+ _consumerListeners.remove(listener);
}
}
- public void resetSubPointersForGroups(QueueSubscription subscription, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
- _messageGroupManager.clearAssignments(subscription);
+ _messageGroupManager.clearAssignments(consumer);
}
if(entry != null)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- QueueSubscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -644,35 +644,35 @@ public class SimpleAMQQueue implements A
QueueEntry entry;
- final QueueSubscription exclusiveSub = _exclusiveSubscriber;
+ final QueueConsumer exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
/*
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_consumerList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
+ node = _consumerList.getMarkedNode();
nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
}
}
@@ -686,13 +686,13 @@ public class SimpleAMQQueue implements A
if (nextNode == null)
{
loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _consumerList.getHead();
}
else
{
- // if subscription at end, and active, offer
- QueueSubscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ // if consumer at end, and active, offer
+ QueueConsumer sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -702,7 +702,7 @@ public class SimpleAMQQueue implements A
if (entry.isAvailable())
{
- checkSubscriptionsNotAheadOfDelivery(entry);
+ checkConsumersNotAheadOfDelivery(entry);
if (exclusiveSub != null)
{
@@ -723,7 +723,7 @@ public class SimpleAMQQueue implements A
}
- private void deliverToSubscription(final QueueSubscription sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
throws AMQException
{
@@ -732,14 +732,14 @@ public class SimpleAMQQueue implements A
try
{
if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ && consumerReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(entry);
}
else
@@ -755,7 +755,7 @@ public class SimpleAMQQueue implements A
}
}
- private boolean assign(final QueueSubscription sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
{
@@ -769,17 +769,17 @@ public class SimpleAMQQueue implements A
}
}
- private boolean mightAssign(final QueueSubscription sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
- QueueSubscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -813,7 +813,7 @@ public class SimpleAMQQueue implements A
getAtomicQueueCount().incrementAndGet();
}
- private void deliverMessage(final QueueSubscription sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -824,13 +824,13 @@ public class SimpleAMQQueue implements A
sub.send(entry, batch);
}
- private boolean subscriptionReadyAndHasInterest(final QueueSubscription sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final QueueSubscription sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
if (subContext != null)
@@ -845,7 +845,7 @@ public class SimpleAMQQueue implements A
}
}
- private void updateSubRequeueEntry(final QueueSubscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
{
QueueContext subContext = sub.getQueueContext();
@@ -865,11 +865,11 @@ public class SimpleAMQQueue implements A
public void requeue(QueueEntry entry)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- QueueSubscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -882,11 +882,11 @@ public class SimpleAMQQueue implements A
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ public void dequeue(QueueEntry entry, Consumer sub)
{
decrementQueueCount();
decrementQueueSize(entry);
- if (entry.acquiredBySubscription())
+ if (entry.acquiredByConsumer())
{
_deliveredMessages.decrementAndGet();
}
@@ -914,17 +914,17 @@ public class SimpleAMQQueue implements A
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
{
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
+ /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
+ entry to resend and move back the consumer pointer. */
- subscription.getSendLock();
+ consumer.getSendLock();
try
{
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
- deliverMessage((QueueSubscription)subscription, entry, false);
+ deliverMessage((QueueConsumer) consumer, entry, false);
return true;
}
else
@@ -934,7 +934,7 @@ public class SimpleAMQQueue implements A
}
finally
{
- subscription.releaseSendLock();
+ consumer.releaseSendLock();
}
}
@@ -942,7 +942,7 @@ public class SimpleAMQQueue implements A
public int getConsumerCount()
{
- return _subscriptionList.size();
+ return _consumerList.size();
}
public int getConsumerCountHigh()
@@ -1025,16 +1025,16 @@ public class SimpleAMQQueue implements A
}
- public void stateChanged(QueueSubscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(QueueConsumer sub, Consumer.State oldState, Consumer.State newState)
{
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ if (oldState == Consumer.State.ACTIVE && newState != Consumer.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if (newState == Subscription.State.ACTIVE)
+ else if (newState == Consumer.State.ACTIVE)
{
- if (oldState != Subscription.State.ACTIVE)
+ if (oldState != Consumer.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -1058,12 +1058,12 @@ public class SimpleAMQQueue implements A
return _atomicQueueSize;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(QueueSubscription exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1079,9 +1079,9 @@ public class SimpleAMQQueue implements A
return _entries;
}
- protected SubscriptionList getSubscriptionList()
+ protected QueueConsumerList getConsumerList()
{
- return _subscriptionList;
+ return _consumerList;
}
@@ -1315,11 +1315,11 @@ public class SimpleAMQQueue implements A
b.getExchange().removeBinding(b);
}
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- QueueSubscription s = subscriptionIter.getNode().getSubscription();
+ QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
if (s != null)
{
s.queueDeleted();
@@ -1449,7 +1449,7 @@ public class SimpleAMQQueue implements A
}
- public void deliverAsync(QueueSubscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1463,17 +1463,17 @@ public class SimpleAMQQueue implements A
}
- public void flushSubscription(Subscription sub) throws AMQException
+ public void flushConsumer(Consumer sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
throw new AMQSecurityException("Permission denied: " + getName());
}
- flushSubscription(sub, Long.MAX_VALUE);
+ flushConsumer(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+ public boolean flushConsumer(Consumer sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1494,8 +1494,8 @@ public class SimpleAMQQueue implements A
sub.getSendLock();
}
- atTail = attemptDelivery((QueueSubscription)sub, true);
- if (atTail && getNextAvailableEntry((QueueSubscription)sub) == null)
+ atTail = attemptDelivery((QueueConsumer)sub, true);
+ if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
{
queueEmpty = true;
}
@@ -1529,21 +1529,21 @@ public class SimpleAMQQueue implements A
}
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if (!hasExclusiveSubscriber())
+ if (!hasExclusiveConsumer())
{
- advanceAllSubscriptions();
+ advanceAllConsumers();
}
return atTail;
}
/**
- * Attempt delivery for the given subscription.
+ * Attempt delivery for the given consumer.
*
- * Looks up the next node for the subscription and attempts to deliver it.
+ * Looks up the next node for the consumer and attempts to deliver it.
*
*
* @param sub
@@ -1551,7 +1551,7 @@ public class SimpleAMQQueue implements A
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(QueueSubscription sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1570,7 +1570,7 @@ public class SimpleAMQQueue implements A
if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(node);
}
else
@@ -1581,7 +1581,7 @@ public class SimpleAMQQueue implements A
}
else // Not enough Credit for message and wouldSuspend
{
- //QPID-1187 - Treat the subscription as suspended for this message
+ //QPID-1187 - Treat the consumer as suspended for this message
// and wait for the message to be removed to continue delivery.
subActive = false;
node.addStateChangeListener(new QueueEntryListener(sub));
@@ -1594,13 +1594,13 @@ public class SimpleAMQQueue implements A
return atTail || !subActive;
}
- protected void advanceAllSubscriptions() throws AMQException
+ protected void advanceAllConsumers() throws AMQException
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
{
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- QueueSubscription sub = subNode.getSubscription();
+ QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
+ QueueConsumer sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1612,7 +1612,7 @@ public class SimpleAMQQueue implements A
}
}
- private QueueEntry getNextAvailableEntry(final QueueSubscription sub)
+ private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
throws AMQException
{
QueueContext context = sub.getQueueContext();
@@ -1653,7 +1653,7 @@ public class SimpleAMQQueue implements A
}
}
- public boolean isEntryAheadOfSubscription(QueueEntry entry, QueueSubscription sub)
+ public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
{
QueueContext context = sub.getQueueContext();
if(context != null)
@@ -1672,14 +1672,14 @@ public class SimpleAMQQueue implements A
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * consumer (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to consumers unsuspending) which are
* capable of accepting/delivering all messages then these messages would
* otherwise remain on the queue.
*
* processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
+ * there are consumers that can deliver them. If there are no
+ * consumers capable of delivering the remaining messages on the queue
* then processQueue should stop to prevent spinning.
*
* Since processQueue is runs in a fixed size Executor, it should not run
@@ -1703,7 +1703,7 @@ public class SimpleAMQQueue implements A
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- final int numSubs = _subscriptionList.size();
+ final int numSubs = _consumerList.size();
final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
@@ -1714,8 +1714,8 @@ public class SimpleAMQQueue implements A
// So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // we want to have one extra loop after every consumer has reached the point where it cannot move
+ // further, just in case the advance of one consumer in the last loop allows a different consumer to
// move forward in the next iteration
if (previousStateChangeCount != stateChangeCount)
@@ -1727,14 +1727,14 @@ public class SimpleAMQQueue implements A
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ boolean allConsumersDone = true;
+ boolean consumerDone;
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- QueueSubscription sub = subscriptionIter.getNode().getSubscription();
+ QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1742,8 +1742,8 @@ public class SimpleAMQQueue implements A
for(int i = 0 ; i < perSub; i++)
{
//attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub, true);
- if (subscriptionDone)
+ consumerDone = attemptDelivery(sub, true);
+ if (consumerDone)
{
sub.flushBatched();
if (lastLoop && !sub.isSuspended())
@@ -1754,9 +1754,9 @@ public class SimpleAMQQueue implements A
}
else
{
- //this subscription can accept additional deliveries, so we must
+ //this consumer can accept additional deliveries, so we must
//keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
+ allConsumersDone = false;
lastLoop = false;
if(--iterations == 0)
{
@@ -1775,24 +1775,24 @@ public class SimpleAMQQueue implements A
}
}
- if(allSubscriptionsDone && lastLoop)
+ if(allConsumersDone && lastLoop)
{
//We have done an extra loop already and there are again
//again no further delivery attempts possible, only
//keep going if state change demands it.
deliveryIncomplete = false;
}
- else if(allSubscriptionsDone)
+ else if(allConsumersDone)
{
- //All subscriptions reported being done, but we have to do
+ //All consumers reported being done, but we have to do
//an extra loop if the iterations are not exhausted and
//there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
+ deliveryIncomplete = _consumerList.size() != 0;
lastLoop = true;
}
else
{
- //some subscriptions can still accept more messages,
+ //some consumers can still accept more messages,
//keep going if iteration count allows.
lastLoop = false;
deliveryIncomplete = true;
@@ -1970,9 +1970,9 @@ public class SimpleAMQQueue implements A
private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
{
- private final QueueSubscription _sub;
+ private final QueueConsumer _sub;
- public QueueEntryListener(final QueueSubscription sub)
+ public QueueEntryListener(final QueueConsumer sub)
{
_sub = sub;
}
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Wed Feb 5 10:29:55 2014
@@ -30,7 +30,7 @@ import java.util.UUID;
public class SortedQueue extends OutOfOrderQueue
{
//Lock object to synchronize enqueue. Used instead of the object
- //monitor to prevent lock order issues with subscription sendLocks
+ //monitor to prevent lock order issues with consumer sendLocks
//and consumer updates in the super classes
private final Object _sortedQueueLock = new Object();
private final String _sortedPropertyName;
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Wed Feb 5 10:29:55 2014
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.transport.TransportException;
import java.util.concurrent.Executor;
@@ -38,7 +37,7 @@ class SubFlushRunner implements Runnable
private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
- private final QueueSubscription _sub;
+ private final QueueConsumer _sub;
private static int IDLE = 0;
private static int SCHEDULED = 1;
@@ -51,7 +50,7 @@ class SubFlushRunner implements Runnable
private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
- public SubFlushRunner(QueueSubscription sub)
+ public SubFlushRunner(QueueConsumer sub)
{
_sub = sub;
}
@@ -65,7 +64,7 @@ class SubFlushRunner implements Runnable
try
{
CurrentActor.set(_sub.getLogActor());
- complete = getQueue().flushSubscription(_sub, ITERATIONS);
+ complete = getQueue().flushConsumer(_sub, ITERATIONS);
}
catch (AMQException e)
{
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Wed Feb 5 10:29:55 2014
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.subscription;
+package org.apache.qpid.server.consumer;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -35,7 +35,6 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.StateChangeListener;
@@ -46,29 +45,26 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class MockSubscription implements SubscriptionTarget
+public class MockConsumer implements ConsumerTarget
{
private final List<String> _messageIds;
private boolean _closed = false;
private String tag = "mocktag";
private AMQQueue queue = null;
- private StateChangeListener<SubscriptionTarget, State> _listener = null;
+ private StateChangeListener<ConsumerTarget, State> _listener = null;
private State _state = State.ACTIVE;
private ArrayList<MessageInstance> messages = new ArrayList<MessageInstance>();
private final Lock _stateChangeLock = new ReentrantLock();
- private static final AtomicLong idGenerator = new AtomicLong(0);
- // Create a simple ID that increments for ever new Subscription
private boolean _isActive = true;
- private Subscription _subscription;
- public MockSubscription()
+ public MockConsumer()
{
_messageIds = null;
}
- public MockSubscription(List<String> messageIds)
+ public MockConsumer(List<String> messageIds)
{
_messageIds = messageIds;
}
@@ -177,13 +173,12 @@ public class MockSubscription implements
}
@Override
- public void subscriptionRegistered(final Subscription sub)
+ public void consumerAdded(final Consumer sub)
{
- _subscription = sub;
}
@Override
- public void subscriptionRemoved(final Subscription sub)
+ public void consumerRemoved(final Consumer sub)
{
}
@@ -199,7 +194,7 @@ public class MockSubscription implements
}
@Override
- public void setStateListener(final StateChangeListener<SubscriptionTarget, State> listener)
+ public void setStateListener(final StateChangeListener<ConsumerTarget, State> listener)
{
_listener = listener;
}
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java Wed Feb 5 10:29:55 2014
@@ -25,7 +25,7 @@ import java.util.List;
/**
* Test SUB Log Messages
*/
-public class SubscriptionMessagesTest extends AbstractTestMessages
+public class ConsumerMessagesTest extends AbstractTestMessages
{
public void testSubscriptionCreateALL()
{
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java Wed Feb 5 10:29:55 2014
@@ -38,7 +38,7 @@ public class VirtualHostMessagesTest ext
validateLogMessage(log, "VHT-1001", expected);
}
- public void testSubscriptionClosed()
+ public void testVirtualhostClosed()
{
_logMessage = VirtualHostMessages.CLOSED();
List<Object> log = performLog();
Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1564703&r1=1564702&r2=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Wed Feb 5 10:29:55 2014
@@ -32,7 +32,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import static org.mockito.Mockito.when;
@@ -66,10 +66,10 @@ public class AMQPriorityQueueTest extend
queue.enqueue(createMessage(9L, (byte) 0));
// Register subscriber
- queue.registerSubscription(getSubscription(), null, null, "test", EnumSet.noneOf(Subscription.Option.class));
+ queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class));
Thread.sleep(150);
- ArrayList<MessageInstance> msgs = getSubscription().getMessages();
+ ArrayList<MessageInstance> msgs = getConsumer().getMessages();
try
{
assertEquals(1L, msgs.get(0).getMessage().getMessageNumber());
Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java (from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java&r1=1564581&r2=1564703&rev=1564703&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SubscriptionListTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java Wed Feb 5 10:29:55 2014
@@ -22,28 +22,28 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionTarget;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.test.utils.QpidTestCase;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class SubscriptionListTest extends QpidTestCase
+public class ConsumerListTest extends QpidTestCase
{
- private SubscriptionList _subList;
- private QueueSubscription _sub1;
- private QueueSubscription _sub2;
- private QueueSubscription _sub3;
- private SubscriptionList.SubscriptionNode _node;
+ private QueueConsumerList _subList;
+ private QueueConsumer _sub1;
+ private QueueConsumer _sub2;
+ private QueueConsumer _sub3;
+ private QueueConsumerList.ConsumerNode _node;
protected void setUp()
{
- _subList = new SubscriptionList();
+ _subList = new QueueConsumerList();
- _sub1 = newMockSubscription();
- _sub2 = newMockSubscription();
- _sub3 = newMockSubscription();
+ _sub1 = newMockConsumer();
+ _sub2 = newMockConsumer();
+ _sub3 = newMockConsumer();
_subList.add(_sub1);
_subList.add(_sub2);
@@ -53,11 +53,11 @@ public class SubscriptionListTest extend
}
- private QueueSubscription newMockSubscription()
+ private QueueConsumer newMockConsumer()
{
- SubscriptionTarget target = mock(SubscriptionTarget.class);
+ ConsumerTarget target = mock(ConsumerTarget.class);
when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
- return new QueueSubscription(null,null,true,true,"sub",false,target);
+ return new QueueConsumer(null,null,true,true,"sub",false,target);
}
/**
@@ -67,14 +67,14 @@ public class SubscriptionListTest extend
*/
public void testFindNextSkipsFirstDeletedNode()
{
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription());
+ assertEquals("Should have returned node for 2nd consumer", _sub2, _node.getConsumer());
assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription());
+ assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer());
}
/**
@@ -86,11 +86,11 @@ public class SubscriptionListTest extend
{
assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub2).delete());
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub2).delete());
assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription());
+ assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer());
}
/**
@@ -101,13 +101,13 @@ public class SubscriptionListTest extend
public void testFindNextSkipsLastDeletedNode()
{
assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 1st subscription", _sub1, _node.getSubscription());
+ assertEquals("Should have returned node for 1st consumer", _sub1, _node.getConsumer());
assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 2nd subscription", _sub2, _node.getSubscription());
+ assertEquals("Should have returned node for 2nd consumer", _sub2, _node.getConsumer());
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub3).delete());
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub3).delete());
assertNull("Returned node should be null", _node = _node.findNext());
}
@@ -119,13 +119,13 @@ public class SubscriptionListTest extend
*/
public void testFindNextSkipsMultipleDeletedNode()
{
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub2).delete());
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub2).delete());
assertNotNull("Returned node should not be null", _node = _node.findNext());
- assertEquals("Should have returned node for 3rd subscription", _sub3, _node.getSubscription());
+ assertEquals("Should have returned node for 3rd consumer", _sub3, _node.getConsumer());
}
/**
@@ -134,21 +134,21 @@ public class SubscriptionListTest extend
*/
public void testDeletedNodeStillPresent()
{
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
- assertNotNull("Node marked deleted should still be present", getNodeForSubscription(_subList, _sub1));
+ assertNotNull("Node marked deleted should still be present", getNodeForConsumer(_subList, _sub1));
assertEquals("All 3 nodes are still expected to be present", 3, countNodes(_subList));
}
/**
* Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given
- * Subscription, or null if none is found.
+ * Consumer, or null if none is found.
*/
- private SubscriptionList.SubscriptionNode getNodeForSubscription(final SubscriptionList list, final Subscription sub)
+ private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final Consumer sub)
{
- SubscriptionList.SubscriptionNode node = list.getHead();
- while (node != null && node.getSubscription() != sub)
+ QueueConsumerList.ConsumerNode node = list.getHead();
+ while (node != null && node.getConsumer() != sub)
{
node = node.nextNode();
}
@@ -159,9 +159,9 @@ public class SubscriptionListTest extend
/**
* Counts the number of (non-head) nodes in the list.
*/
- private int countNodes(final SubscriptionList list)
+ private int countNodes(final QueueConsumerList list)
{
- SubscriptionList.SubscriptionNode node = list.getHead();
+ QueueConsumerList.ConsumerNode node = list.getHead();
int count;
for(count = -1; node != null; count++)
{
@@ -172,13 +172,13 @@ public class SubscriptionListTest extend
}
/**
- * Tests that the head is returned as expected, and isn't the node for the first subscription.
+ * Tests that the head is returned as expected, and isn't the node for the first consumer.
*/
public void testGetHead()
{
assertNotNull("List head should be non null", _node);
- assertNotSame("Head should not be node for first subscription",
- _node, getNodeForSubscription(_subList, _sub1));
+ assertNotSame("Head should not be node for first consumer",
+ _node, getNodeForConsumer(_subList, _sub1));
}
/**
@@ -186,13 +186,13 @@ public class SubscriptionListTest extend
*/
public void testGetSize()
{
- SubscriptionList subList = new SubscriptionList();
+ QueueConsumerList subList = new QueueConsumerList();
assertEquals("Unexpected size result", 0, subList.size());
- QueueSubscription sub1 = newMockSubscription();
- QueueSubscription sub2 = newMockSubscription();
- QueueSubscription sub3 = newMockSubscription();
+ QueueConsumer sub1 = newMockConsumer();
+ QueueConsumer sub2 = newMockConsumer();
+ QueueConsumer sub3 = newMockConsumer();
subList.add(sub1);
assertEquals("Unexpected size result", 1, subList.size());
@@ -203,13 +203,13 @@ public class SubscriptionListTest extend
subList.add(sub3);
assertEquals("Unexpected size result", 3, subList.size());
- assertTrue("Removing subscription from list should have succeeded", subList.remove(sub1));
+ assertTrue("Removing consumer from list should have succeeded", subList.remove(sub1));
assertEquals("Unexpected size result", 2, subList.size());
- assertTrue("Removing subscription from list should have succeeded", subList.remove(sub2));
+ assertTrue("Removing consumer from list should have succeeded", subList.remove(sub2));
assertEquals("Unexpected size result", 1, subList.size());
- assertTrue("Removing subscription from list should have succeeded", subList.remove(sub3));
+ assertTrue("Removing consumer from list should have succeeded", subList.remove(sub3));
assertEquals("Unexpected size result", 0, subList.size());
}
@@ -219,12 +219,13 @@ public class SubscriptionListTest extend
*/
public void testRemoveFirstNode()
{
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1));
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1));
- assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub1));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1));
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1));
+ assertNull("Should not have been a node present for the removed consumer",
+ getNodeForConsumer(_subList, _sub1));
assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3));
}
/**
@@ -233,82 +234,84 @@ public class SubscriptionListTest extend
*/
public void testRemoveCentralNode()
{
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2));
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2));
- assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub2));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2));
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub2));
+ assertNull("Should not have been a node present for the removed consumer",
+ getNodeForConsumer(_subList, _sub2));
assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3));
}
/**
- * Test that if the subscription contained in the last node of the list is removed
+ * Test that if the consumer contained in the last node of the list is removed
* it is no longer present in the node structure of the list at all. However,
* as the last node in the structure can't actually be removed a dummy will instead
* be present.
*/
public void testRemoveLastNode()
{
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub3));
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
- assertNull("Should not have been a node present for the removed subscription", getNodeForSubscription(_subList, _sub3));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub3));
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3));
+ assertNull("Should not have been a node present for the removed consumer",
+ getNodeForConsumer(_subList, _sub3));
//We actually expect 3 nodes to remain this time, because the last node cant be removed for thread safety reasons,
- //however a dummy final node can be used as substitute to allow removal of the subscription node.
+ //however a dummy final node can be used as substitute to allow removal of the consumer node.
assertEquals("Unexpected number of nodes", 2 + 1, countNodes(_subList));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub1));
- assertNotNull("Should have been a node present for the subscription", getNodeForSubscription(_subList, _sub2));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub1));
+ assertNotNull("Should have been a node present for the consumer", getNodeForConsumer(_subList, _sub2));
}
/**
- * Test that if the subscription not contained in the list is requested to be removed
+ * Test that if the consumer not contained in the list is requested to be removed
* that the removal fails
*/
public void testRemoveNonexistentNode()
{
- QueueSubscription sub4 = newMockSubscription();
- assertNull("Should not have been a node present for the subscription", getNodeForSubscription(_subList, sub4));
- assertFalse("Removing subscription node should not have succeeded", _subList.remove(sub4));
+ QueueConsumer sub4 = newMockConsumer();
+ assertNull("Should not have been a node present for the consumer", getNodeForConsumer(_subList, sub4));
+ assertFalse("Removing consumer node should not have succeeded", _subList.remove(sub4));
assertEquals("Unexpected number of nodes", 3, countNodes(_subList));
}
/**
- * Test that if a subscription node which occurs later in the main list than the marked node is
+ * Test that if a consumer node which occurs later in the main list than the marked node is
* removed from the list after the marked node is also removed, then the marker node doesn't
* serve to retain the subsequent nodes in the list structure (and thus memory) despite their
* removal.
*/
public void testDeletedMarkedNodeDoesntLeakSubsequentlyDeletedNodes()
{
- //get the nodes out the list for the 1st and 3rd subscriptions
- SubscriptionList.SubscriptionNode sub1Node = getNodeForSubscription(_subList, _sub1);
- assertNotNull("Should have been a node present for the subscription", sub1Node);
- SubscriptionList.SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3);
- assertNotNull("Should have been a node present for the subscription", sub3Node);
+ //get the nodes out the list for the 1st and 3rd consumers
+ QueueConsumerList.ConsumerNode sub1Node = getNodeForConsumer(_subList, _sub1);
+ assertNotNull("Should have been a node present for the consumer", sub1Node);
+ QueueConsumerList.ConsumerNode sub3Node = getNodeForConsumer(_subList, _sub3);
+ assertNotNull("Should have been a node present for the consumer", sub3Node);
- //mark the first subscription node
+ //mark the first consumer node
assertTrue("should have succeeded in updating the marked node",
- _subList.updateMarkedNode(_subList.getMarkedNode(), sub1Node));
+ _subList.updateMarkedNode(_subList.getMarkedNode(), sub1Node));
- //remove the 1st subscription from the list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1));
- //verify the 1st subscription is no longer the marker node (replaced by a dummy), or in the main list structure
+ //remove the 1st consumer from the list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1));
+ //verify the 1st consumer is no longer the marker node (replaced by a dummy), or in the main list structure
assertNotSame("Unexpected marker node", sub1Node, _subList.getMarkedNode());
assertNull("Should not have been a node present in the list structure for the marked-but-removed sub1 node",
- getNodeForSubscription(_subList, _sub1));
+ getNodeForConsumer(_subList, _sub1));
- //remove the 2nd subscription from the list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub2));
+ //remove the 2nd consumer from the list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub2));
//verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node
- //in its list structure is now the 3rd subscription (since the 2nd was removed too)
+ //in its list structure is now the 3rd consumer (since the 2nd was removed too)
assertEquals("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode());
- //remove the 3rd and final/tail subscription
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
+ //remove the 3rd and final/tail consumer
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3));
//verify the marker node isn't leaking subsequently removed nodes, by ensuring the very next node
- //in its list structure is now the dummy tail (since the 3rd subscription was removed, and a dummy
+ //in its list structure is now the dummy tail (since the 3rd consumer was removed, and a dummy
//tail was inserted) and NOT the 3rd sub node.
assertNotSame("Unexpected next node", sub3Node, _subList.getMarkedNode().nextNode());
assertTrue("Unexpected next node", _subList.getMarkedNode().nextNode().isDeleted());
@@ -316,36 +319,36 @@ public class SubscriptionListTest extend
}
/**
- * Test that the marked node 'findNext' behaviour is as expected after a subscription is added
- * to the list following the tail subscription node being removed while it is the marked node.
- * That is, that the new subscriptions node is returned by getMarkedNode().findNext().
+ * Test that the marked node 'findNext' behaviour is as expected after a consumer is added
+ * to the list following the tail consumer node being removed while it is the marked node.
+ * That is, that the new consumers node is returned by getMarkedNode().findNext().
*/
- public void testMarkedNodeFindsNewSubscriptionAfterRemovingTailWhilstMarked()
+ public void testMarkedNodeFindsNewConsumerAfterRemovingTailWhilstMarked()
{
- //get the node out the list for the 3rd subscription
- SubscriptionList.SubscriptionNode sub3Node = getNodeForSubscription(_subList, _sub3);
- assertNotNull("Should have been a node present for the subscription", sub3Node);
+ //get the node out the list for the 3rd consumer
+ QueueConsumerList.ConsumerNode sub3Node = getNodeForConsumer(_subList, _sub3);
+ assertNotNull("Should have been a node present for the consumer", sub3Node);
- //mark the 3rd subscription node
+ //mark the 3rd consumer node
assertTrue("should have succeeded in updating the marked node",
_subList.updateMarkedNode(_subList.getMarkedNode(), sub3Node));
//verify calling findNext on the marked node returns null, i.e. the end of the list has been reached
assertEquals("Unexpected node after marked node", null, _subList.getMarkedNode().findNext());
- //remove the 3rd(marked) subscription from the list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub3));
+ //remove the 3rd(marked) consumer from the list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub3));
- //add a new 4th subscription to the list
- QueueSubscription sub4 = newMockSubscription();
+ //add a new 4th consumer to the list
+ QueueConsumer sub4 = newMockConsumer();
_subList.add(sub4);
- //get the node out the list for the 4th subscription
- SubscriptionList.SubscriptionNode sub4Node = getNodeForSubscription(_subList, sub4);
- assertNotNull("Should have been a node present for the subscription", sub4Node);
+ //get the node out the list for the 4th consumer
+ QueueConsumerList.ConsumerNode sub4Node = getNodeForConsumer(_subList, sub4);
+ assertNotNull("Should have been a node present for the consumer", sub4Node);
- //verify the marked node (which is now a dummy substitute for the 3rd subscription) returns
- //the 4th subscriptions node as the next non-deleted node.
+ //verify the marked node (which is now a dummy substitute for the 3rd consumer) returns
+ //the 4th consumers node as the next non-deleted node.
assertEquals("Unexpected next node", sub4Node, _subList.getMarkedNode().findNext());
}
@@ -358,12 +361,12 @@ public class SubscriptionListTest extend
assertTrue("should have succeeded in updating the marked node",
_subList.updateMarkedNode(_subList.getMarkedNode(), null));
- //remove the 1st subscription from the main list
- assertTrue("Removing subscription node should have succeeded", _subList.remove(_sub1));
+ //remove the 1st consumer from the main list
+ assertTrue("Removing consumer node should have succeeded", _subList.remove(_sub1));
- //verify the 1st subscription is no longer in the main list structure
+ //verify the 1st consumer is no longer in the main list structure
assertNull("Should not have been a node present in the main list structure for sub1",
- getNodeForSubscription(_subList, _sub1));
+ getNodeForConsumer(_subList, _sub1));
assertEquals("Unexpected number of nodes", 2, countNodes(_subList));
}
@@ -373,21 +376,21 @@ public class SubscriptionListTest extend
*/
public void testIteratorSkipsFirstDeletedNode()
{
- //'delete' but don't remove the node for the 1st subscription
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub1).delete());
- assertNotNull("Should still have been a node present for the deleted subscription",
- getNodeForSubscription(_subList, _sub1));
+ //'delete' but don't remove the node for the 1st consumer
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub1).delete());
+ assertNotNull("Should still have been a node present for the deleted consumer",
+ getNodeForConsumer(_subList, _sub1));
- SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator();
+ QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator();
- //verify the iterator returns the 2nd subscriptions node
+ //verify the iterator returns the 2nd consumers node
assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub2, iter.getNode().getConsumer());
- //verify the iterator returns the 3rd subscriptions node and not the 2nd.
+ //verify the iterator returns the 3rd consumers node and not the 2nd.
assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub3, iter.getNode().getConsumer());
}
/**
@@ -396,21 +399,21 @@ public class SubscriptionListTest extend
*/
public void testIteratorSkipsCentralDeletedNode()
{
- //'delete' but don't remove the node for the 2nd subscription
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub2).delete());
- assertNotNull("Should still have been a node present for the deleted subscription",
- getNodeForSubscription(_subList, _sub2));
+ //'delete' but don't remove the node for the 2nd consumer
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub2).delete());
+ assertNotNull("Should still have been a node present for the deleted consumer",
+ getNodeForConsumer(_subList, _sub2));
- SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator();
+ QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator();
- //verify the iterator returns the 1st subscriptions node
+ //verify the iterator returns the 1st consumers node
assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub1, iter.getNode().getConsumer());
- //verify the iterator returns the 3rd subscriptions node and not the 2nd.
+ //verify the iterator returns the 3rd consumers node and not the 2nd.
assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub3, iter.getNode().getSubscription());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub3, iter.getNode().getConsumer());
}
/**
@@ -419,24 +422,24 @@ public class SubscriptionListTest extend
*/
public void testIteratorSkipsDeletedFinalNode()
{
- //'delete' but don't remove the node for the 3rd subscription
- assertTrue("Deleting subscription node should have succeeded",
- getNodeForSubscription(_subList, _sub3).delete());
- assertNotNull("Should still have been a node present for the deleted 3rd subscription",
- getNodeForSubscription(_subList, _sub3));
+ //'delete' but don't remove the node for the 3rd consumer
+ assertTrue("Deleting consumer node should have succeeded",
+ getNodeForConsumer(_subList, _sub3).delete());
+ assertNotNull("Should still have been a node present for the deleted 3rd consumer",
+ getNodeForConsumer(_subList, _sub3));
- SubscriptionList.SubscriptionNodeIterator iter = _subList.iterator();
+ QueueConsumerList.ConsumerNodeIterator iter = _subList.iterator();
- //verify the iterator returns the 1st subscriptions node
+ //verify the iterator returns the 1st consumers node
assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub1, iter.getNode().getSubscription());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub1, iter.getNode().getConsumer());
- //verify the iterator returns the 2nd subscriptions node
+ //verify the iterator returns the 2nd consumers node
assertTrue("Iterator should have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", _sub2, iter.getNode().getSubscription());
+ assertEquals("Iterator returned unexpected ConsumerNode", _sub2, iter.getNode().getConsumer());
- //verify the iterator can no longer advance and does not return a subscription node
+ //verify the iterator can no longer advance and does not return a consumer node
assertFalse("Iterator should not have been able to advance", iter.advance());
- assertEquals("Iterator returned unexpected SubscriptionNode", null, iter.getNode());
+ assertEquals("Iterator returned unexpected ConsumerNode", null, iter.getNode());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org