You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC

svn commit: r1567616 [6/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/ qpid/cpp/bindings/qpid/ruby/ qp...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Feb 12 13:27:51 2014
@@ -18,15 +18,7 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -43,27 +35,33 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.QueueActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
-import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
-import org.apache.qpid.server.subscription.MessageGroupManager;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
+                                       StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
+                                       MessageGroupManager.ConsumerResetHelper<E,Q,L>
 {
 
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -96,11 +94,11 @@ public class SimpleAMQQueue implements A
     private Exchange _alternateExchange;
 
 
-    private final QueueEntryList<QueueEntry> _entries;
+    private final L _entries;
 
-    private final SubscriptionList _subscriptionList = new SubscriptionList();
+    private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
 
-    private volatile Subscription _exclusiveSubscriber;
+    private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
 
 
 
@@ -120,13 +118,7 @@ public class SimpleAMQQueue implements A
     private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
     private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
     private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
-    private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
-    private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
-    private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
-    private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
-    private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
     private final AtomicLong _unackedMsgCount = new AtomicLong(0);
-    private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
     private final AtomicLong _unackedMsgBytes = new AtomicLong();
 
     private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -165,14 +157,13 @@ public class SimpleAMQQueue implements A
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
-    private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+    private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
 
 
     private LogSubject _logSubject;
     private LogActor _logActor;
 
-    private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
-    private boolean _nolocal;
+    private boolean _noLocal;
 
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
     private boolean _deleteOnNoConsumers;
@@ -185,20 +176,15 @@ public class SimpleAMQQueue implements A
 
     /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
     private int _maximumDeliveryCount;
-    private final MessageGroupManager _messageGroupManager;
+    private final MessageGroupManager<E,Q,L> _messageGroupManager;
 
-    private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
-            new ArrayList<SubscriptionRegistrationListener>();
+    private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
+            new ArrayList<ConsumerRegistrationListener<Q>>();
 
     private AMQQueue.NotificationListener _notificationListener;
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
 
 
-    public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
-    {
-        this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
-    }
-
     protected SimpleAMQQueue(UUID id,
                              String name,
                              boolean durable,
@@ -206,7 +192,7 @@ public class SimpleAMQQueue implements A
                              boolean autoDelete,
                              boolean exclusive,
                              VirtualHost virtualHost,
-                             QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
+                             QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
     {
 
         if (name == null)
@@ -225,7 +211,7 @@ public class SimpleAMQQueue implements A
         _autoDelete = autoDelete;
         _exclusive = exclusive;
         _virtualHost = virtualHost;
-        _entries = entryListFactory.createQueueEntryList(this);
+        _entries = entryListFactory.createQueueEntryList((Q)this);
         _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
 
         _id = id;
@@ -251,13 +237,13 @@ public class SimpleAMQQueue implements A
             {
                 Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
                 _messageGroupManager =
-                        new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+                        new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
                                 defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
                                 this);
             }
             else
             {
-                _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+                _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
                         Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
             }
         }
@@ -289,21 +275,20 @@ public class SimpleAMQQueue implements A
         }
         catch (RejectedExecutionException ree)
         {
-            if (_stopped.get())
-            {
-                // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
-            }
-            else
+            // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+            if(!_stopped.get())
             {
                 _logger.error("Unexpected rejected execution", ree);
                 throw ree;
+
             }
+
         }
     }
 
     public void setNoLocal(boolean nolocal)
     {
-        _nolocal = nolocal;
+        _noLocal = nolocal;
     }
 
     public UUID getId()
@@ -388,11 +373,17 @@ public class SimpleAMQQueue implements A
         return _name;
     }
 
-    // ------ Manage Subscriptions
+    // ------ Manage Consumers
+
 
-    public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
-            throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive
+    @Override
+    public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
+                                     final FilterManager filters,
+                                     final Class<? extends ServerMessage> messageClass,
+                                     final String consumerName,
+                                     EnumSet<Consumer.Option> optionSet) throws AMQException
     {
+
         // Access control
         if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
         {
@@ -400,58 +391,61 @@ public class SimpleAMQQueue implements A
         }
 
 
-        if (hasExclusiveSubscriber())
+        if (hasExclusiveConsumer())
         {
-            throw new ExistingExclusiveSubscription();
+            throw new ExistingExclusiveConsumer();
         }
 
-        if (exclusive && !subscription.isTransient())
+
+        boolean exclusive =  optionSet.contains(Consumer.Option.EXCLUSIVE);
+        boolean isTransient =  optionSet.contains(Consumer.Option.TRANSIENT);
+
+        if (exclusive && !isTransient && getConsumerCount() != 0)
         {
-            if (getConsumerCount() != 0)
-            {
-                throw new ExistingSubscriptionPreventsExclusive();
-            }
-            else
-            {
-                _exclusiveSubscriber = subscription;
-            }
+            throw new ExistingConsumerPreventsExclusive();
+        }
+
+        QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
+                                                         optionSet.contains(Consumer.Option.ACQUIRES),
+                                                         optionSet.contains(Consumer.Option.SEES_REQUEUES),
+                                                         consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+        target.consumerAdded(consumer);
+
+
+        if (exclusive && !isTransient)
+        {
+            _exclusiveSubscriber = consumer;
         }
 
-        if(subscription.isActive())
+        if(consumer.isActive())
         {
             _activeSubscriberCount.incrementAndGet();
         }
-        subscription.setStateListener(this);
-        subscription.setQueueContext(new QueueContext(_entries.getHead()));
+
+        consumer.setStateListener(this);
+        consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
 
         if (!isDeleted())
         {
-            subscription.setQueue(this, exclusive);
-            if(_nolocal)
+            consumer.setQueue((Q)this, exclusive);
+            if(_noLocal)
             {
-                subscription.setNoLocal(_nolocal);
+                consumer.setNoLocal(true);
             }
 
-            synchronized (_subscriptionListeners)
+            synchronized (_consumerListeners)
             {
-                for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+                for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
                 {
-                    listener.subscriptionRegistered(this, subscription);
+                    listener.consumerAdded((Q)this, consumer);
                 }
             }
 
-            _subscriptionList.add(subscription);
-
-            //Increment consumerCountHigh if necessary. (un)registerSubscription are both
-            //synchronized methods so we don't need additional synchronization here
-            if(_consumerCountHigh.get() < getConsumerCount())
-            {
-                _consumerCountHigh.incrementAndGet();
-            }
+            _consumerList.add(consumer);
 
             if (isDeleted())
             {
-                subscription.queueDeleted(this);
+                consumer.queueDeleted();
             }
         }
         else
@@ -459,42 +453,49 @@ public class SimpleAMQQueue implements A
             // TODO
         }
 
-        deliverAsync(subscription);
+        deliverAsync(consumer);
+
+        return consumer;
 
     }
 
-    public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
+    synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) throws AMQException
     {
-        if (subscription == null)
+        if (consumer == null)
         {
-            throw new NullPointerException("subscription argument is null");
+            throw new NullPointerException("consumer argument is null");
         }
 
-        boolean removed = _subscriptionList.remove(subscription);
+        boolean removed = _consumerList.remove(consumer);
 
         if (removed)
         {
-            subscription.close();
+            consumer.close();
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
-            subscription.setQueueContext(null);
+            consumer.setQueueContext(null);
+
+            if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
+            {
+                setAuthorizationHolder(null);
+            }
 
             if(_messageGroupManager != null)
             {
-                resetSubPointersForGroups(subscription, true);
+                resetSubPointersForGroups(consumer, true);
             }
 
-            synchronized (_subscriptionListeners)
+            synchronized (_consumerListeners)
             {
-                for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+                for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
                 {
-                    listener.subscriptionUnregistered(this, subscription);
+                    listener.consumerRemoved((Q)this, consumer);
                 }
             }
 
             // auto-delete queues must be deleted if there are no remaining subscribers
 
-            if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0  )
+            if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0  )
             {
                 if (_logger.isInfoEnabled())
                 {
@@ -503,57 +504,57 @@ public class SimpleAMQQueue implements A
 
                 getVirtualHost().removeQueue(this);
 
-                // we need to manually fire the event to the removed subscription (which was the last one left for this
-                // queue. This is because the delete method uses the subscription set which has just been cleared
-                subscription.queueDeleted(this);
+                // we need to manually fire the event to the removed consumer (which was the last one left for this
+                // queue. This is because the delete method uses the consumer set which has just been cleared
+                consumer.queueDeleted();
             }
         }
 
     }
 
-    public Collection<Subscription> getConsumers()
+    public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
     {
-        List<Subscription> consumers = new ArrayList<Subscription>();
-        SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+        List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
+        QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
         while(iter.advance())
         {
-            consumers.add(iter.getNode().getSubscription());
+            consumers.add(iter.getNode().getConsumer());
         }
         return consumers;
 
     }
 
-    public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
     {
-        synchronized (_subscriptionListeners)
+        synchronized (_consumerListeners)
         {
-            _subscriptionListeners.add(listener);
+            _consumerListeners.add(listener);
         }
     }
 
-    public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
     {
-        synchronized (_subscriptionListeners)
+        synchronized (_consumerListeners)
         {
-            _subscriptionListeners.remove(listener);
+            _consumerListeners.remove(listener);
         }
     }
 
-    public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+    public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
     {
-        QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+        E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
         if(clearAssignments)
         {
-            _messageGroupManager.clearAssignments(subscription);
+            _messageGroupManager.clearAssignments(consumer);
         }
 
         if(entry != null)
         {
-            SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+            QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
             // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
             while (subscriberIter.advance())
             {
-                Subscription sub = subscriberIter.getNode().getSubscription();
+                QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
 
                 // we don't make browsers send the same stuff twice
                 if (sub.seesRequeues())
@@ -591,11 +592,6 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public int getBindingCountHigh()
-    {
-        return _bindingCountHigh.get();
-    }
-
     public void removeBinding(final Binding binding)
     {
         _bindings.remove(binding);
@@ -617,59 +613,45 @@ public class SimpleAMQQueue implements A
     }
 
     // ------ Enqueue / Dequeue
-    public void enqueue(ServerMessage message) throws AMQException
-    {
-        enqueue(message, null);
-    }
 
-    public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+    public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action) throws AMQException
     {
-        enqueue(message, false, action);
-    }
-
-    public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
-    {
-
-        if(transactional)
-        {
-            incrementTxnEnqueueStats(message);
-        }
         incrementQueueCount();
         incrementQueueSize(message);
 
         _totalMessagesReceived.incrementAndGet();
 
 
-        QueueEntry entry;
-        final Subscription exclusiveSub = _exclusiveSubscriber;
+        E entry;
+        final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
         entry = _entries.add(message);
 
         if(action != null || (exclusiveSub == null  && _queueRunner.isIdle()))
         {
             /*
 
-            iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+            iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
 
              */
-            SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
-            SubscriptionList.SubscriptionNode nextNode = node.findNext();
+            QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
+            QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
             if (nextNode == null)
             {
-                nextNode = _subscriptionList.getHead().findNext();
+                nextNode = _consumerList.getHead().findNext();
             }
             while (nextNode != null)
             {
-                if (_subscriptionList.updateMarkedNode(node, nextNode))
+                if (_consumerList.updateMarkedNode(node, nextNode))
                 {
                     break;
                 }
                 else
                 {
-                    node = _subscriptionList.getMarkedNode();
+                    node = _consumerList.getMarkedNode();
                     nextNode = node.findNext();
                     if (nextNode == null)
                     {
-                        nextNode = _subscriptionList.getHead().findNext();
+                        nextNode = _consumerList.getHead().findNext();
                     }
                 }
             }
@@ -683,13 +665,13 @@ public class SimpleAMQQueue implements A
                 if (nextNode == null)
                 {
                     loops--;
-                    nextNode = _subscriptionList.getHead();
+                    nextNode = _consumerList.getHead();
                 }
                 else
                 {
-                    // if subscription at end, and active, offer
-                    Subscription sub = nextNode.getSubscription();
-                    deliverToSubscription(sub, entry);
+                    // if consumer at end, and active, offer
+                    QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
+                    deliverToConsumer(sub, entry);
                 }
                 nextNode = nextNode.findNext();
 
@@ -699,7 +681,7 @@ public class SimpleAMQQueue implements A
 
         if (entry.isAvailable())
         {
-            checkSubscriptionsNotAheadOfDelivery(entry);
+            checkConsumersNotAheadOfDelivery(entry);
 
             if (exclusiveSub != null)
             {
@@ -715,12 +697,12 @@ public class SimpleAMQQueue implements A
 
         if(action != null)
         {
-            action.onEnqueue(entry);
+            action.performAction(entry);
         }
 
     }
 
-    private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+    private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
             throws AMQException
     {
 
@@ -729,14 +711,14 @@ public class SimpleAMQQueue implements A
             try
             {
                 if (!sub.isSuspended()
-                    && subscriptionReadyAndHasInterest(sub, entry)
+                    && consumerReadyAndHasInterest(sub, entry)
                     && mightAssign(sub, entry)
                     && !sub.wouldSuspend(entry))
                 {
                     if (sub.acquires() && !assign(sub, entry))
                     {
                         // restore credit here that would have been taken away by wouldSuspend since we didn't manage
-                        // to acquire the entry for this subscription
+                        // to acquire the entry for this consumer
                         sub.restoreCredit(entry);
                     }
                     else
@@ -752,7 +734,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private boolean assign(final Subscription sub, final QueueEntry entry)
+    private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
         if(_messageGroupManager == null)
         {
@@ -766,17 +748,17 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+    private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
         if(_messageGroupManager == null || !sub.acquires())
         {
             return true;
         }
-        Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+        QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
         return (assigned == null) || (assigned == sub);
     }
 
-    protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+    protected void checkConsumersNotAheadOfDelivery(final E entry)
     {
         // This method is only required for queues which mess with ordering
         // Simple Queues don't :-)
@@ -810,19 +792,7 @@ public class SimpleAMQQueue implements A
         getAtomicQueueCount().incrementAndGet();
     }
 
-    private void incrementTxnEnqueueStats(final ServerMessage message)
-    {
-        _msgTxnEnqueues.incrementAndGet();
-        _byteTxnEnqueues.addAndGet(message.getSize());
-    }
-
-    private void incrementTxnDequeueStats(QueueEntry entry)
-    {
-        _msgTxnDequeues.incrementAndGet();
-        _byteTxnDequeues.addAndGet(entry.getSize());
-    }
-
-    private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
+    private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
             throws AMQException
     {
         setLastSeenEntry(sub, entry);
@@ -833,18 +803,18 @@ public class SimpleAMQQueue implements A
         sub.send(entry, batch);
     }
 
-    private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
+    private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException
     {
         return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
     }
 
 
-    private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+    private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
-        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        QueueContext<E,Q,L> subContext = sub.getQueueContext();
         if (subContext != null)
         {
-            QueueEntry releasedEntry = subContext.getReleasedEntry();
+            E releasedEntry = subContext.getReleasedEntry();
 
             QueueContext._lastSeenUpdater.set(subContext, entry);
             if(releasedEntry == entry)
@@ -854,13 +824,13 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
+    private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
 
-        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        QueueContext<E,Q,L> subContext = sub.getQueueContext();
         if(subContext != null)
         {
-            QueueEntry oldEntry;
+            E oldEntry;
 
             while((oldEntry  = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
             {
@@ -872,13 +842,13 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public void requeue(QueueEntry entry)
+    public void requeue(E entry)
     {
-        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+        QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
         while (subscriberIter.advance() && entry.isAvailable())
         {
-            Subscription sub = subscriberIter.getNode().getSubscription();
+            QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
 
             // we don't make browsers send the same stuff twice
             if (sub.seesRequeues())
@@ -891,25 +861,21 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void dequeue(QueueEntry entry, Subscription sub)
+    @Override
+    public void dequeue(E entry)
     {
         decrementQueueCount();
         decrementQueueSize(entry);
-        if (entry.acquiredBySubscription())
+        if (entry.acquiredByConsumer())
         {
             _deliveredMessages.decrementAndGet();
         }
 
-        if(sub != null && sub.isSessionTransactional())
-        {
-            incrementTxnDequeueStats(entry);
-        }
-
         checkCapacity();
 
     }
 
-    private void decrementQueueSize(final QueueEntry entry)
+    private void decrementQueueSize(final E entry)
     {
         final ServerMessage message = entry.getMessage();
         long size = message.getSize();
@@ -928,17 +894,17 @@ public class SimpleAMQQueue implements A
         _dequeueCount.incrementAndGet();
     }
 
-    public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+    public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer) throws AMQException
     {
-        /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
-                  entry to resend and move back the subscription pointer. */
+        /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
+                  entry to resend and move back the consumer pointer. */
 
-        subscription.getSendLock();
+        consumer.getSendLock();
         try
         {
-            if (!subscription.isClosed())
+            if (!consumer.isClosed())
             {
-                deliverMessage(subscription, entry, false);
+                deliverMessage(consumer, entry, false);
                 return true;
             }
             else
@@ -948,7 +914,7 @@ public class SimpleAMQQueue implements A
         }
         finally
         {
-            subscription.releaseSendLock();
+            consumer.releaseSendLock();
         }
     }
 
@@ -956,12 +922,7 @@ public class SimpleAMQQueue implements A
 
     public int getConsumerCount()
     {
-        return _subscriptionList.size();
-    }
-
-    public int getConsumerCountHigh()
-    {
-        return _consumerCountHigh.get();
+        return _consumerList.size();
     }
 
     public int getActiveConsumerCount()
@@ -1009,11 +970,11 @@ public class SimpleAMQQueue implements A
 
     public long getOldestMessageArrivalTime()
     {
-        QueueEntry entry = getOldestQueueEntry();
+        E entry = getOldestQueueEntry();
         return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
     }
 
-    protected QueueEntry getOldestQueueEntry()
+    protected E getOldestQueueEntry()
     {
         return _entries.next(_entries.getHead());
     }
@@ -1023,13 +984,13 @@ public class SimpleAMQQueue implements A
         return _deleted.get();
     }
 
-    public List<QueueEntry> getMessagesOnTheQueue()
+    public List<E> getMessagesOnTheQueue()
     {
-        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        ArrayList<E> entryList = new ArrayList<E>();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (node != null && !node.isDeleted())
             {
                 entryList.add(node);
@@ -1039,16 +1000,16 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+    public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
     {
-        if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+        if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
         {
             _activeSubscriberCount.decrementAndGet();
 
         }
-        else if (newState == Subscription.State.ACTIVE)
+        else if (newState == QueueConsumer.State.ACTIVE)
         {
-            if (oldState != Subscription.State.ACTIVE)
+            if (oldState != QueueConsumer.State.ACTIVE)
             {
                 _activeSubscriberCount.incrementAndGet();
 
@@ -1057,7 +1018,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public int compareTo(final AMQQueue o)
+    public int compareTo(final Q o)
     {
         return _name.compareTo(o.getName());
     }
@@ -1072,12 +1033,12 @@ public class SimpleAMQQueue implements A
         return _atomicQueueSize;
     }
 
-    public boolean hasExclusiveSubscriber()
+    public boolean hasExclusiveConsumer()
     {
         return _exclusiveSubscriber != null;
     }
 
-    private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+    private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
     {
         _exclusiveSubscriber = exclusiveSubscriber;
     }
@@ -1088,32 +1049,32 @@ public class SimpleAMQQueue implements A
     }
 
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
-    protected QueueEntryList getEntries()
+    protected L getEntries()
     {
         return _entries;
     }
 
-    protected SubscriptionList getSubscriptionList()
+    protected QueueConsumerList<E,Q,L> getConsumerList()
     {
-        return _subscriptionList;
+        return _consumerList;
     }
 
 
-    public static interface QueueEntryFilter
+    public static interface QueueEntryFilter<E extends QueueEntry>
     {
-        public boolean accept(QueueEntry entry);
+        public boolean accept(E entry);
 
         public boolean filterComplete();
     }
 
 
 
-    public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+    public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
     {
-        return getMessagesOnTheQueue(new QueueEntryFilter()
+        return getMessagesOnTheQueue(new QueueEntryFilter<E>()
         {
 
-            public boolean accept(QueueEntry entry)
+            public boolean accept(E entry)
             {
                 final long messageId = entry.getMessage().getMessageNumber();
                 return messageId >= fromMessageId && messageId <= toMessageId;
@@ -1126,13 +1087,13 @@ public class SimpleAMQQueue implements A
         });
     }
 
-    public QueueEntry getMessageOnTheQueue(final long messageId)
+    public E getMessageOnTheQueue(final long messageId)
     {
-        List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+        List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
         {
             private boolean _complete;
 
-            public boolean accept(QueueEntry entry)
+            public boolean accept(E entry)
             {
                 _complete = entry.getMessage().getMessageNumber() == messageId;
                 return _complete;
@@ -1146,13 +1107,13 @@ public class SimpleAMQQueue implements A
         return entries.isEmpty() ? null : entries.get(0);
     }
 
-    public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
+    public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
     {
-        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        ArrayList<E> entryList = new ArrayList<E>();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         while (queueListIterator.advance() && !filter.filterComplete())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (!node.isDeleted() && filter.accept(node))
             {
                 entryList.add(node);
@@ -1162,13 +1123,13 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void visit(final QueueEntryVisitor visitor)
+    public void visit(final QueueEntryVisitor<E> visitor)
     {
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
 
         while(queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
 
             if(!node.isDeleted())
             {
@@ -1185,17 +1146,17 @@ public class SimpleAMQQueue implements A
      *
      * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
      * Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
-     * @param fromPosition
-     * @param toPosition
-     * @return
+     * @param fromPosition first message position
+     * @param toPosition last message position
+     * @return list of messages
      */
-    public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+    public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
     {
-        return getMessagesOnTheQueue(new QueueEntryFilter()
+        return getMessagesOnTheQueue(new QueueEntryFilter<E>()
                                         {
                                             private long position = 0;
 
-                                            public boolean accept(QueueEntry entry)
+                                            public boolean accept(E entry)
                                             {
                                                 position++;
                                                 return (position >= fromPosition) && (position <= toPosition);
@@ -1224,12 +1185,12 @@ public class SimpleAMQQueue implements A
     // TODO - now only used by the tests
     public void deleteMessageFromTop()
     {
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         boolean noDeletes = true;
 
         while (noDeletes && queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (node.acquire())
             {
                 dequeueEntry(node);
@@ -1252,14 +1213,14 @@ public class SimpleAMQQueue implements A
             throw new AMQSecurityException("Permission denied: queue " + getName());
         }
 
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         long count = 0;
 
         ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (node.acquire())
             {
                 dequeueEntry(node, txn);
@@ -1276,13 +1237,13 @@ public class SimpleAMQQueue implements A
         return count;
     }
 
-    private void dequeueEntry(final QueueEntry node)
+    private void dequeueEntry(final E node)
     {
         ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
         dequeueEntry(node, txn);
     }
 
-    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+    private void dequeueEntry(final E node, ServerTransaction txn)
     {
         txn.dequeue(this, node.getMessage(),
                     new ServerTransaction.Action()
@@ -1300,18 +1261,18 @@ public class SimpleAMQQueue implements A
                     });
     }
 
-    public void addQueueDeleteTask(final Task task)
+    public void addQueueDeleteTask(final Action<AMQQueue> task)
     {
         _deleteTaskList.add(task);
     }
 
-    public void removeQueueDeleteTask(final Task task)
+    public void removeQueueDeleteTask(final Action<AMQQueue> task)
     {
         _deleteTaskList.remove(task);
     }
 
     // TODO list all thrown exceptions
-    public int delete() throws AMQSecurityException, AMQException
+    public int delete() throws AMQException
     {
         // Check access
         if (!_virtualHost.getSecurityManager().authoriseDelete(this))
@@ -1322,27 +1283,29 @@ public class SimpleAMQQueue implements A
         if (!_deleted.getAndSet(true))
         {
 
-            for (Binding b : _bindings)
+            final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+            for (Binding b : bindingCopy)
             {
                 b.getExchange().removeBinding(b);
             }
 
-            SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+            QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
 
-            while (subscriptionIter.advance())
+            while (consumerNodeIterator.advance())
             {
-                Subscription s = subscriptionIter.getNode().getSubscription();
+                QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
                 if (s != null)
                 {
-                    s.queueDeleted(this);
+                    s.queueDeleted();
                 }
             }
 
 
-            List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+            List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
             {
 
-                public boolean accept(QueueEntry entry)
+                public boolean accept(E entry)
                 {
                     return entry.acquire();
                 }
@@ -1356,7 +1319,7 @@ public class SimpleAMQQueue implements A
             ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
 
-            for(final QueueEntry entry : entries)
+            for(final E entry : entries)
             {
                 // TODO log requeues with a post enqueue action
                 int requeues = entry.routeToAlternate(null, txn);
@@ -1375,9 +1338,9 @@ public class SimpleAMQQueue implements A
             }
 
 
-            for (Task task : _deleteTaskList)
+            for (Action<AMQQueue> task : _deleteTaskList)
             {
-                task.doTask(this);
+                task.performAction(this);
             }
 
             _deleteTaskList.clear();
@@ -1461,7 +1424,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void deliverAsync(Subscription sub)
+    public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
     {
         if(_exclusiveSubscriber == null)
         {
@@ -1469,28 +1432,23 @@ public class SimpleAMQQueue implements A
         }
         else
         {
-            SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
-            if(flusher == null)
-            {
-                flusher = new SubFlushRunner(sub);
-                sub.set(SUB_FLUSH_RUNNER, flusher);
-            }
+            SubFlushRunner flusher = sub.getRunner();
             flusher.execute(_asyncDelivery);
         }
 
     }
 
-    public void flushSubscription(Subscription sub) throws AMQException
+    void flushConsumer(QueueConsumer<?,E,Q,L> sub) throws AMQException
     {
         // Access control
         if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
         {
             throw new AMQSecurityException("Permission denied: " + getName());
         }
-        flushSubscription(sub, Long.MAX_VALUE);
+        flushConsumer(sub, Long.MAX_VALUE);
     }
 
-    public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+    boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations) throws AMQException
     {
         boolean atTail = false;
         final boolean keepSendLockHeld = iterations <=  SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1546,29 +1504,29 @@ public class SimpleAMQQueue implements A
         }
 
 
-        // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+        // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
         // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
         // which would give us memory "leak".
 
-        if (!hasExclusiveSubscriber())
+        if (!hasExclusiveConsumer())
         {
-            advanceAllSubscriptions();
+            advanceAllConsumers();
         }
         return atTail;
     }
 
     /**
-     * Attempt delivery for the given subscription.
+     * Attempt delivery for the given consumer.
      *
-     * Looks up the next node for the subscription and attempts to deliver it.
+     * Looks up the next node for the consumer and attempts to deliver it.
      *
      *
-     * @param sub
-     * @param batch
+     * @param sub the consumer
+     * @param batch true if processing can be batched
      * @return true if we have completed all possible deliveries for this sub.
      * @throws AMQException
      */
-    private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
+    private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch) throws AMQException
     {
         boolean atTail = false;
 
@@ -1576,7 +1534,7 @@ public class SimpleAMQQueue implements A
         if (subActive)
         {
 
-            QueueEntry node  = getNextAvailableEntry(sub);
+            E node  = getNextAvailableEntry(sub);
 
             if (node != null && node.isAvailable())
             {
@@ -1587,7 +1545,7 @@ public class SimpleAMQQueue implements A
                         if (sub.acquires() && !assign(sub, node))
                         {
                             // restore credit here that would have been taken away by wouldSuspend since we didn't manage
-                            // to acquire the entry for this subscription
+                            // to acquire the entry for this consumer
                             sub.restoreCredit(node);
                         }
                         else
@@ -1598,7 +1556,7 @@ public class SimpleAMQQueue implements A
                     }
                     else // Not enough Credit for message and wouldSuspend
                     {
-                        //QPID-1187 - Treat the subscription as suspended for this message
+                        //QPID-1187 - Treat the consumer as suspended for this message
                         // and wait for the message to be removed to continue delivery.
                         subActive = false;
                         node.addStateChangeListener(new QueueEntryListener(sub));
@@ -1611,13 +1569,13 @@ public class SimpleAMQQueue implements A
         return atTail || !subActive;
     }
 
-    protected void advanceAllSubscriptions() throws AMQException
+    protected void advanceAllConsumers() throws AMQException
     {
-        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
-        while (subscriberIter.advance())
+        QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
+        while (consumerNodeIterator.advance())
         {
-            SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
-            Subscription sub = subNode.getSubscription();
+            QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
+            QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
             if(sub.acquires())
             {
                 getNextAvailableEntry(sub);
@@ -1629,16 +1587,16 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private QueueEntry getNextAvailableEntry(final Subscription sub)
+    private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
             throws AMQException
     {
-        QueueContext context = (QueueContext) sub.getQueueContext();
+        QueueContext<E,Q,L> context = sub.getQueueContext();
         if(context != null)
         {
-            QueueEntry lastSeen = context.getLastSeenEntry();
-            QueueEntry releasedNode = context.getReleasedEntry();
+            E lastSeen = context.getLastSeenEntry();
+            E releasedNode = context.getReleasedEntry();
 
-            QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+            E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
             while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1670,12 +1628,12 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+    public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
     {
-        QueueContext context = (QueueContext) sub.getQueueContext();
+        QueueContext<E,Q,L> context = sub.getQueueContext();
         if(context != null)
         {
-            QueueEntry releasedNode = context.getReleasedEntry();
+            E releasedNode = context.getReleasedEntry();
             return releasedNode != null && releasedNode.compareTo(entry) < 0;
         }
         else
@@ -1689,14 +1647,14 @@ public class SimpleAMQQueue implements A
      *
      * A queue Runner is started whenever a state change occurs, e.g when a new
      * message arrives on the queue and cannot be immediately delivered to a
-     * subscription (i.e. asynchronous delivery is required). Unless there are
-     * SubFlushRunners operating (due to subscriptions unsuspending) which are
+     * consumer (i.e. asynchronous delivery is required). Unless there are
+     * SubFlushRunners operating (due to consumers unsuspending) which are
      * capable of accepting/delivering all messages then these messages would
      * otherwise remain on the queue.
      *
      * processQueue should be running while there are messages on the queue AND
-     * there are subscriptions that can deliver them. If there are no
-     * subscriptions capable of delivering the remaining messages on the queue
+     * there are consumers that can deliver them. If there are no
+     * consumers capable of delivering the remaining messages on the queue
      * then processQueue should stop to prevent spinning.
      *
      * Since processQueue is runs in a fixed size Executor, it should not run
@@ -1712,7 +1670,7 @@ public class SimpleAMQQueue implements A
      */
     public long processQueue(QueueRunner runner) throws AMQException
     {
-        long stateChangeCount = Long.MIN_VALUE;
+        long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         long rVal = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
@@ -1720,7 +1678,7 @@ public class SimpleAMQQueue implements A
         boolean lastLoop = false;
         int iterations = MAX_ASYNC_DELIVERIES;
 
-        final int numSubs = _subscriptionList.size();
+        final int numSubs = _consumerList.size();
 
         final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
 
@@ -1731,8 +1689,8 @@ public class SimpleAMQQueue implements A
         // So whilst delivery/rejection is going on a processQueue thread will be running
         while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
         {
-            // we want to have one extra loop after every subscription has reached the point where it cannot move
-            // further, just in case the advance of one subscription in the last loop allows a different subscription to
+            // we want to have one extra loop after every consumer has reached the point where it cannot move
+            // further, just in case the advance of one consumer in the last loop allows a different consumer to
             // move forward in the next iteration
 
             if (previousStateChangeCount != stateChangeCount)
@@ -1744,14 +1702,14 @@ public class SimpleAMQQueue implements A
             }
 
             previousStateChangeCount = stateChangeCount;
-            boolean allSubscriptionsDone = true;
-            boolean subscriptionDone;
+            boolean allConsumersDone = true;
+            boolean consumerDone;
 
-            SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+            QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
             //iterate over the subscribers and try to advance their pointer
-            while (subscriptionIter.advance())
+            while (consumerNodeIterator.advance())
             {
-                Subscription sub = subscriptionIter.getNode().getSubscription();
+                QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
                 sub.getSendLock();
 
                     try
@@ -1759,8 +1717,8 @@ public class SimpleAMQQueue implements A
                         for(int i = 0 ; i < perSub; i++)
                         {
                             //attempt delivery. returns true if no further delivery currently possible to this sub
-                            subscriptionDone = attemptDelivery(sub, true);
-                            if (subscriptionDone)
+                            consumerDone = attemptDelivery(sub, true);
+                            if (consumerDone)
                             {
                                 sub.flushBatched();
                                 if (lastLoop && !sub.isSuspended())
@@ -1771,9 +1729,9 @@ public class SimpleAMQQueue implements A
                             }
                             else
                             {
-                                //this subscription can accept additional deliveries, so we must
+                                //this consumer can accept additional deliveries, so we must
                                 //keep going after this (if iteration slicing allows it)
-                                allSubscriptionsDone = false;
+                                allConsumersDone = false;
                                 lastLoop = false;
                                 if(--iterations == 0)
                                 {
@@ -1792,24 +1750,24 @@ public class SimpleAMQQueue implements A
                     }
             }
 
-            if(allSubscriptionsDone && lastLoop)
+            if(allConsumersDone && lastLoop)
             {
                 //We have done an extra loop already and there are again
                 //again no further delivery attempts possible, only
                 //keep going if state change demands it.
                 deliveryIncomplete = false;
             }
-            else if(allSubscriptionsDone)
+            else if(allConsumersDone)
             {
-                //All subscriptions reported being done, but we have to do
+                //All consumers reported being done, but we have to do
                 //an extra loop if the iterations are not exhausted and
                 //there is still any work to be done
-                deliveryIncomplete = _subscriptionList.size() != 0;
+                deliveryIncomplete = _consumerList.size() != 0;
                 lastLoop = true;
             }
             else
             {
-                //some subscriptions can still accept more messages,
+                //some consumers can still accept more messages,
                 //keep going if iteration count allows.
                 lastLoop = false;
                 deliveryIncomplete = true;
@@ -1833,11 +1791,11 @@ public class SimpleAMQQueue implements A
 
     public void checkMessageStatus() throws AMQException
     {
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
 
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             // Only process nodes that are not currently deleted and not dequeued
             if (!node.isDeleted())
             {
@@ -1984,12 +1942,12 @@ public class SimpleAMQQueue implements A
         return _notificationChecks;
     }
 
-    private final class QueueEntryListener implements QueueEntry.StateChangeListener
+    private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
     {
 
-        private final Subscription _sub;
+        private final QueueConsumer<?,E,Q,L> _sub;
 
-        public QueueEntryListener(final Subscription sub)
+        public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
         {
             _sub = sub;
         }
@@ -2005,7 +1963,7 @@ public class SimpleAMQQueue implements A
             return System.identityHashCode(_sub);
         }
 
-        public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+        public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
         {
             entry.removeStateChangeListener(this);
             deliverAsync(_sub);
@@ -2076,26 +2034,6 @@ public class SimpleAMQQueue implements A
         return _dequeueSize.get();
     }
 
-    public long getByteTxnEnqueues()
-    {
-        return _byteTxnEnqueues.get();
-    }
-
-    public long getByteTxnDequeues()
-    {
-        return _byteTxnDequeues.get();
-    }
-
-    public long getMsgTxnEnqueues()
-    {
-        return _msgTxnEnqueues.get();
-    }
-
-    public long getMsgTxnDequeues()
-    {
-        return _msgTxnDequeues.get();
-    }
-
     public long getPersistentByteEnqueues()
     {
         return _persistentMessageEnqueueSize.get();
@@ -2123,11 +2061,6 @@ public class SimpleAMQQueue implements A
         return getName();
     }
 
-    public long getUnackedMessageCountHigh()
-    {
-        return _unackedMsgCountHigh.get();
-    }
-
     public long getUnackedMessageCount()
     {
         return _unackedMsgCount.get();
@@ -2138,25 +2071,16 @@ public class SimpleAMQQueue implements A
         return _unackedMsgBytes.get();
     }
 
-    public void decrementUnackedMsgCount(QueueEntry queueEntry)
+    public void decrementUnackedMsgCount(E queueEntry)
     {
         _unackedMsgCount.decrementAndGet();
         _unackedMsgBytes.addAndGet(-queueEntry.getSize());
     }
 
-    private void incrementUnackedMsgCount(QueueEntry entry)
+    private void incrementUnackedMsgCount(E entry)
     {
-        long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+        _unackedMsgCount.incrementAndGet();
         _unackedMsgBytes.addAndGet(entry.getSize());
-
-        long unackedMsgCountHigh;
-        while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
-        {
-            if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
-            {
-                break;
-            }
-        }
     }
 
     public LogActor getLogActor()
@@ -2224,4 +2148,39 @@ public class SimpleAMQQueue implements A
         return (String) _arguments.get(Queue.DESCRIPTION);
     }
 
+    public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+                              final InstanceProperties instanceProperties,
+                              final ServerTransaction txn,
+                              final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+    {
+            txn.enqueue(this,message, new ServerTransaction.Action()
+            {
+                MessageReference _reference = message.newReference();
+
+                public void postCommit()
+                {
+                    try
+                    {
+                        SimpleAMQQueue.this.enqueue(message, postEnqueueAction);
+                    }
+                    catch (AMQException e)
+                    {
+                        // TODO
+                        throw new RuntimeException(e);
+                    }
+                    finally
+                    {
+                        _reference.release();
+                    }
+                }
+
+                public void onRollback()
+                {
+                    _reference.release();
+                }
+            });
+            return 1;
+
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Wed Feb 12 13:27:51 2014
@@ -20,16 +20,19 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
 import java.util.UUID;
 
-public class SortedQueue extends OutOfOrderQueue
+public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
 {
     //Lock object to synchronize enqueue. Used instead of the object
-    //monitor to prevent lock order issues with subscription sendLocks
+    //monitor to prevent lock order issues with consumer sendLocks
     //and consumer updates in the super classes
     private final Object _sortedQueueLock = new Object();
     private final String _sortedPropertyName;
@@ -38,17 +41,33 @@ public class SortedQueue extends OutOfOr
                             final boolean durable, final String owner, final boolean autoDelete,
                             final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
     {
+        this(id, name, durable, owner, autoDelete, exclusive,
+             virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName));
+    }
+
+
+    protected SortedQueue(UUID id, final String name,
+                          final boolean durable, final String owner, final boolean autoDelete,
+                          final boolean exclusive, final VirtualHost virtualHost,
+                          Map<String, Object> arguments,
+                          String sortedPropertyName,
+                          QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory)
+    {
         super(id, name, durable, owner, autoDelete, exclusive,
-                virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+              virtualHost, factory, arguments);
         this._sortedPropertyName = sortedPropertyName;
     }
 
+
     public String getSortedPropertyName()
     {
         return _sortedPropertyName;
     }
 
-    public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+    @Override
+    public void enqueue(final ServerMessage message,
+                        final Action<? super MessageInstance<?, QueueConsumer<?, SortedQueueEntry, SortedQueue, SortedQueueEntryList>>> action)
+            throws AMQException
     {
         synchronized (_sortedQueueLock)
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Wed Feb 12 13:27:51 2014
@@ -21,37 +21,37 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
 
 /**
  * A sorted implementation of QueueEntryList.
  * Uses the red/black tree algorithm specified in "Introduction to Algorithms".
  * ISBN-10: 0262033844
  * ISBN-13: 978-0262033848
- * @see http://en.wikipedia.org/wiki/Red-black_tree
+ * see http://en.wikipedia.org/wiki/Red-black_tree
  */
-public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
+public class SortedQueueEntryList implements SimpleQueueEntryList<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
 {
-    private final SortedQueueEntryImpl _head;
-    private SortedQueueEntryImpl _root;
+    private final SortedQueueEntry _head;
+    private SortedQueueEntry _root;
     private long _entryId = Long.MIN_VALUE;
     private final Object _lock = new Object();
-    private final AMQQueue _queue;
+    private final SortedQueue _queue;
     private final String _propertyName;
 
-    public SortedQueueEntryList(final AMQQueue queue, final String propertyName)
+    public SortedQueueEntryList(final SortedQueue queue, final String propertyName)
     {
         _queue = queue;
-        _head = new SortedQueueEntryImpl(this);
+        _head = new SortedQueueEntry(this);
         _propertyName = propertyName;
     }
 
-    public AMQQueue getQueue()
+    public SortedQueue getQueue()
     {
         return _queue;
     }
 
-    public SortedQueueEntryImpl add(final ServerMessage message)
+    public SortedQueueEntry add(final ServerMessage message)
     {
         synchronized(_lock)
         {
@@ -62,7 +62,7 @@ public class SortedQueueEntryList implem
                 key = val.toString();
             }
 
-            final SortedQueueEntryImpl entry = new SortedQueueEntryImpl(this,message, ++_entryId);
+            final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId);
             entry.setKey(key);
 
             insert(entry);
@@ -75,9 +75,9 @@ public class SortedQueueEntryList implem
      * Red Black Tree insert implementation.
      * @param entry the entry to insert.
      */
-    private void insert(final SortedQueueEntryImpl entry)
+    private void insert(final SortedQueueEntry entry)
     {
-        SortedQueueEntryImpl node = _root;
+        SortedQueueEntry node;
         if((node = _root) == null)
         {
             _root = entry;
@@ -87,7 +87,7 @@ public class SortedQueueEntryList implem
         }
         else
         {
-            SortedQueueEntryImpl parent = null;
+            SortedQueueEntry parent = null;
             while(node != null)
             {
                 parent = node;
@@ -105,7 +105,7 @@ public class SortedQueueEntryList implem
             if(entry.compareTo(parent) < 0)
             {
                 parent.setLeft(entry);
-                final SortedQueueEntryImpl prev = parent.getPrev();
+                final SortedQueueEntry prev = parent.getPrev();
                 entry.setNext(parent);
                 prev.setNext(entry);
                 entry.setPrev(prev);
@@ -115,7 +115,7 @@ public class SortedQueueEntryList implem
             {
                 parent.setRight(entry);
 
-                final SortedQueueEntryImpl next = parent.getNextValidEntry();
+                final SortedQueueEntry next = parent.getNextValidEntry();
                 entry.setNext(next);
                 parent.setNext(entry);
 
@@ -130,15 +130,15 @@ public class SortedQueueEntryList implem
         insertFixup(entry);
     }
 
-    private void insertFixup(SortedQueueEntryImpl entry)
+    private void insertFixup(SortedQueueEntry entry)
     {
         while(isParentColour(entry, Colour.RED))
         {
-            final SortedQueueEntryImpl grandparent = nodeGrandparent(entry);
+            final SortedQueueEntry grandparent = nodeGrandparent(entry);
 
             if(nodeParent(entry) == leftChild(grandparent))
             {
-                final SortedQueueEntryImpl y = rightChild(grandparent);
+                final SortedQueueEntry y = rightChild(grandparent);
                 if(isNodeColour(y, Colour.RED))
                 {
                     setColour(nodeParent(entry), Colour.BLACK);
@@ -160,7 +160,7 @@ public class SortedQueueEntryList implem
             }
             else
             {
-                final SortedQueueEntryImpl y = leftChild(grandparent);
+                final SortedQueueEntry y = leftChild(grandparent);
                 if(isNodeColour(y, Colour.RED))
                 {
                     setColour(nodeParent(entry), Colour.BLACK);
@@ -184,11 +184,11 @@ public class SortedQueueEntryList implem
         _root.setColour(Colour.BLACK);
     }
 
-    private void leftRotate(final SortedQueueEntryImpl entry)
+    private void leftRotate(final SortedQueueEntry entry)
     {
         if(entry != null)
         {
-            final SortedQueueEntryImpl rightChild = rightChild(entry);
+            final SortedQueueEntry rightChild = rightChild(entry);
             entry.setRight(rightChild.getLeft());
             if(entry.getRight() != null)
             {
@@ -212,11 +212,11 @@ public class SortedQueueEntryList implem
         }
     }
 
-    private void rightRotate(final SortedQueueEntryImpl entry)
+    private void rightRotate(final SortedQueueEntry entry)
     {
         if(entry != null)
         {
-            final SortedQueueEntryImpl leftChild = leftChild(entry);
+            final SortedQueueEntry leftChild = leftChild(entry);
             entry.setLeft(leftChild.getRight());
             if(entry.getLeft() != null)
             {
@@ -240,7 +240,7 @@ public class SortedQueueEntryList implem
         }
     }
 
-    private void setColour(final SortedQueueEntryImpl node, final Colour colour)
+    private void setColour(final SortedQueueEntry node, final Colour colour)
     {
         if(node != null)
         {
@@ -248,45 +248,45 @@ public class SortedQueueEntryList implem
         }
     }
 
-    private SortedQueueEntryImpl leftChild(final SortedQueueEntryImpl node)
+    private SortedQueueEntry leftChild(final SortedQueueEntry node)
     {
         return node == null ? null : node.getLeft();
     }
 
-    private SortedQueueEntryImpl rightChild(final SortedQueueEntryImpl node)
+    private SortedQueueEntry rightChild(final SortedQueueEntry node)
     {
         return node == null ? null : node.getRight();
     }
 
-    private SortedQueueEntryImpl nodeParent(final SortedQueueEntryImpl node)
+    private SortedQueueEntry nodeParent(final SortedQueueEntry node)
     {
         return node == null ? null : node.getParent();
     }
 
-    private SortedQueueEntryImpl nodeGrandparent(final SortedQueueEntryImpl node)
+    private SortedQueueEntry nodeGrandparent(final SortedQueueEntry node)
     {
         return nodeParent(nodeParent(node));
     }
 
-    private boolean isParentColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+    private boolean isParentColour(final SortedQueueEntry node, final SortedQueueEntry.Colour colour)
     {
 
         return node != null && isNodeColour(node.getParent(), colour);
     }
 
-    protected boolean isNodeColour(final SortedQueueEntryImpl node, final SortedQueueEntryImpl.Colour colour)
+    protected boolean isNodeColour(final SortedQueueEntry node, final SortedQueueEntry.Colour colour)
     {
         return (node == null ? Colour.BLACK : node.getColour()) == colour;
     }
 
-    public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
+    public SortedQueueEntry next(final SortedQueueEntry node)
     {
         synchronized(_lock)
         {
             if(node.isDeleted() && _head != node)
             {
-                SortedQueueEntryImpl current = _head;
-                SortedQueueEntryImpl next;
+                SortedQueueEntry current = _head;
+                SortedQueueEntry next;
                 while(current != null)
                 {
                     next = current.getNextValidEntry();
@@ -308,22 +308,22 @@ public class SortedQueueEntryList implem
         }
     }
 
-    public QueueEntryIterator<SortedQueueEntryImpl> iterator()
+    public QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>> iterator()
     {
         return new QueueEntryIteratorImpl(_head);
     }
 
-    public SortedQueueEntryImpl getHead()
+    public SortedQueueEntry getHead()
     {
         return _head;
     }
 
-    protected SortedQueueEntryImpl getRoot()
+    protected SortedQueueEntry getRoot()
     {
         return _root;
     }
 
-    public void entryDeleted(final SortedQueueEntryImpl entry)
+    public void entryDeleted(final SortedQueueEntry entry)
     {
         synchronized(_lock)
         {
@@ -336,20 +336,20 @@ public class SortedQueueEntryList implem
 
             // Then deal with the easy doubly linked list deletion (need to do
             // this after the swap as the swap uses next
-            final SortedQueueEntryImpl prev = entry.getPrev();
+            final SortedQueueEntry prev = entry.getPrev();
             if(prev != null)
             {
                 prev.setNext(entry.getNextValidEntry());
             }
 
-            final SortedQueueEntryImpl next = entry.getNextValidEntry();
+            final SortedQueueEntry next = entry.getNextValidEntry();
             if(next != null)
             {
                 next.setPrev(prev);
             }
 
             // now deal with splicing
-            final SortedQueueEntryImpl chosenChild;
+            final SortedQueueEntry chosenChild;
 
             if(leftChild(entry) != null)
             {
@@ -428,14 +428,14 @@ public class SortedQueueEntryList implem
     /**
      * Swaps the position of the node in the tree with it's successor
      * (that is the node with the next highest key)
-     * @param entry
+     * @param entry the entry to be swapped with its successor
      */
-    private void swapWithSuccessor(final SortedQueueEntryImpl entry)
+    private void swapWithSuccessor(final SortedQueueEntry entry)
     {
-        final SortedQueueEntryImpl next = entry.getNextValidEntry();
-        final SortedQueueEntryImpl nextParent = next.getParent();
-        final SortedQueueEntryImpl nextLeft = next.getLeft();
-        final SortedQueueEntryImpl nextRight = next.getRight();
+        final SortedQueueEntry next = entry.getNextValidEntry();
+        final SortedQueueEntry nextParent = next.getParent();
+        final SortedQueueEntry nextLeft = next.getLeft();
+        final SortedQueueEntry nextRight = next.getRight();
         final Colour nextColour = next.getColour();
 
         // Special case - the successor is the right child of the node
@@ -530,7 +530,7 @@ public class SortedQueueEntryList implem
         }
     }
 
-    private void deleteFixup(SortedQueueEntryImpl entry)
+    private void deleteFixup(SortedQueueEntry entry)
     {
         int i = 0;
         while(entry != null && entry != _root
@@ -545,7 +545,7 @@ public class SortedQueueEntryList implem
 
             if(entry == leftChild(nodeParent(entry)))
             {
-                SortedQueueEntryImpl rightSibling = rightChild(nodeParent(entry));
+                SortedQueueEntry rightSibling = rightChild(nodeParent(entry));
                 if(isNodeColour(rightSibling, Colour.RED))
                 {
                     setColour(rightSibling, Colour.BLACK);
@@ -578,7 +578,7 @@ public class SortedQueueEntryList implem
             }
             else
             {
-                SortedQueueEntryImpl leftSibling = leftChild(nodeParent(entry));
+                SortedQueueEntry leftSibling = leftChild(nodeParent(entry));
                 if(isNodeColour(leftSibling, Colour.RED))
                 {
                     setColour(leftSibling, Colour.BLACK);
@@ -613,16 +613,16 @@ public class SortedQueueEntryList implem
         setColour(entry, Colour.BLACK);
     }
 
-    private Colour getColour(final SortedQueueEntryImpl x)
+    private Colour getColour(final SortedQueueEntry x)
     {
         return x == null ? null : x.getColour();
     }
 
-    public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntryImpl>
+    public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>>
     {
-        private SortedQueueEntryImpl _lastNode;
+        private SortedQueueEntry _lastNode;
 
-        public QueueEntryIteratorImpl(final SortedQueueEntryImpl startNode)
+        public QueueEntryIteratorImpl(final SortedQueueEntry startNode)
         {
             _lastNode = startNode;
         }
@@ -632,7 +632,7 @@ public class SortedQueueEntryList implem
             return next(_lastNode) == null;
         }
 
-        public SortedQueueEntryImpl getNode()
+        public SortedQueueEntry getNode()
         {
             return _lastNode;
         }
@@ -641,7 +641,7 @@ public class SortedQueueEntryList implem
         {
             if(!atTail())
             {
-                SortedQueueEntryImpl nextNode = next(_lastNode);
+                SortedQueueEntry nextNode = next(_lastNode);
                 while(nextNode.isDeleted() && next(nextNode) != null)
                 {
                     nextNode = next(nextNode);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java Wed Feb 12 13:27:51 2014
@@ -19,7 +19,7 @@
  */
 package org.apache.qpid.server.queue;
 
-public class SortedQueueEntryListFactory implements QueueEntryListFactory
+public class SortedQueueEntryListFactory implements QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>
 {
 
     private final String _propertyName;
@@ -30,9 +30,8 @@ public class SortedQueueEntryListFactory
     }
 
     @Override
-    public QueueEntryList<SortedQueueEntryImpl> createQueueEntryList(final AMQQueue queue)
+    public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
     {
         return new SortedQueueEntryList(queue, _propertyName);
     }
-
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Wed Feb 12 13:27:51 2014
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.transport.TransportException;
 
 import java.util.concurrent.Executor;
@@ -38,7 +37,7 @@ class SubFlushRunner implements Runnable
     private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
 
 
-    private final Subscription _sub;
+    private final QueueConsumer _sub;
 
     private static int IDLE = 0;
     private static int SCHEDULED = 1;
@@ -51,7 +50,7 @@ class SubFlushRunner implements Runnable
     private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
     private final AtomicBoolean _stateChange = new AtomicBoolean();
 
-    public SubFlushRunner(Subscription sub)
+    public SubFlushRunner(QueueConsumer sub)
     {
         _sub = sub;
     }
@@ -65,7 +64,7 @@ class SubFlushRunner implements Runnable
             try
             {
                 CurrentActor.set(_sub.getLogActor());
-                complete = getQueue().flushSubscription(_sub, ITERATIONS);
+                complete = getQueue().flushConsumer(_sub, ITERATIONS);
             }
             catch (AMQException e)
             {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Wed Feb 12 13:27:51 2014
@@ -1078,7 +1078,7 @@ abstract public class AbstractJDBCMessag
                     stmt.setString(4, "E");
                     for(Transaction.Record record : enqueues)
                     {
-                        stmt.setString(5, record.getQueue().getId().toString());
+                        stmt.setString(5, record.getResource().getId().toString());
                         stmt.setLong(6, record.getMessage().getMessageNumber());
                         stmt.executeUpdate();
                     }
@@ -1089,7 +1089,7 @@ abstract public class AbstractJDBCMessag
                     stmt.setString(4, "D");
                     for(Transaction.Record record : dequeues)
                     {
-                        stmt.setString(5, record.getQueue().getId().toString());
+                        stmt.setString(5, record.getResource().getId().toString());
                         stmt.setLong(6, record.getMessage().getMessageNumber());
                         stmt.executeUpdate();
                     }
@@ -1212,7 +1212,7 @@ abstract public class AbstractJDBCMessag
             buf.position(1);
             buf = buf.slice();
 
-            metaData.writeToBuffer(0, buf);
+            metaData.writeToBuffer(buf);
             ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
             try
             {
@@ -1384,7 +1384,7 @@ abstract public class AbstractJDBCMessag
         }
 
         @Override
-        public TransactionLogResource getQueue()
+        public TransactionLogResource getResource()
         {
             return this;
         }
@@ -1414,10 +1414,22 @@ abstract public class AbstractJDBCMessag
         }
 
         @Override
+        public String getName()
+        {
+            return _queueId.toString();
+        }
+
+        @Override
         public UUID getId()
         {
             return _queueId;
         }
+
+        @Override
+        public boolean isDurable()
+        {
+            return true;
+        }
     }
 
     protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
            ('svn:mergeinfo' removed)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Wed Feb 12 13:27:51 2014
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import java.util.Set;
 import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.model.Binding;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -46,7 +47,7 @@ public class DurableConfigurationStoreHe
                                                                                                   Queue.EXCLUSIVE,
                                                                                                   Queue.ALTERNATE_EXCHANGE));
 
-    public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
+    public static void updateQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue) throws AMQStoreException
     {
         Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
         attributesMap.put(Queue.NAME, queue.getName());
@@ -71,7 +72,7 @@ public class DurableConfigurationStoreHe
         store.update(queue.getId(), QUEUE, attributesMap);
     }
 
-    public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
+    public static void createQueue(DurableConfigurationStore store, AMQQueue<?,?,?> queue)
             throws AMQStoreException
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java Wed Feb 12 13:27:51 2014
@@ -29,7 +29,7 @@ public interface StorableMessageMetaData
 
     int getStorableSize();
 
-    int writeToBuffer(int offsetInMetaData, ByteBuffer dest);
+    int writeToBuffer(ByteBuffer dest);
 
     int getContentSize();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org