You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/08 18:52:07 UTC

svn commit: r1566069 [2/4] - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/m...

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Sat Feb  8 17:52:05 2014
@@ -47,7 +47,7 @@ import java.util.concurrent.locks.Reentr
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
 
-class QueueConsumer<T extends ConsumerTarget> implements Consumer
+class QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements Consumer
 {
 
     public static enum State
@@ -61,10 +61,9 @@ class QueueConsumer<T extends ConsumerTa
     private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
     private final long _id;
-    private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
     private final Lock _stateChangeLock = new ReentrantLock();
     private final long _createTime = System.currentTimeMillis();
-    private final MessageInstance.ConsumerAcquiredState _owningState = new MessageInstance.ConsumerAcquiredState(this);
+    private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this);
     private final boolean _acquires;
     private final boolean _seesRequeues;
     private final String _consumerName;
@@ -74,8 +73,10 @@ class QueueConsumer<T extends ConsumerTa
     private final FilterManager _filters;
     private final Class<? extends ServerMessage> _messageClass;
     private final Object _sessionReference;
-    private SimpleAMQQueue _queue;
-    private GenericActor _logActor;
+    private Q _queue;
+    private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+                                                      + "(UNKNOWN)"
+                                                      + "] ");
 
     static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
             new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
@@ -89,10 +90,10 @@ class QueueConsumer<T extends ConsumerTa
 
     private final T _target;
     private final SubFlushRunner _runner = new SubFlushRunner(this);
-    private volatile QueueContext _queueContext;
-    private StateChangeListener<? extends Consumer, State> _stateListener = new StateChangeListener<Consumer, State>()
+    private volatile QueueContext<E,Q,L> _queueContext;
+    private StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumer<T,E,Q,L>, State>()
     {
-        public void stateChanged(Consumer sub, State oldState, State newState)
+        public void stateChanged(QueueConsumer sub, State oldState, State newState)
         {
             CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
         }
@@ -158,8 +159,7 @@ class QueueConsumer<T extends ConsumerTa
                 throw new RuntimeException(e);
             }
         }
-        final StateChangeListener<Consumer, State> stateListener =
-                (StateChangeListener<Consumer, State>) getStateListener();
+        final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> stateListener = getStateListener();
         if(stateListener != null)
         {
             stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
@@ -251,12 +251,12 @@ class QueueConsumer<T extends ConsumerTa
         return STATE_MAP.get(_target.getState());
     }
 
-    public final SimpleAMQQueue getQueue()
+    public final Q getQueue()
     {
         return _queue;
     }
 
-    final void setQueue(SimpleAMQQueue queue, boolean exclusive)
+    final void setQueue(Q queue, boolean exclusive)
     {
         if(getQueue() != null)
         {
@@ -300,9 +300,9 @@ class QueueConsumer<T extends ConsumerTa
         getQueue().flushConsumer(this);
     }
 
-    boolean resend(final MessageInstance entry) throws AMQException
+    boolean resend(final E entry) throws AMQException
     {
-        return getQueue().resend((QueueEntry)entry, this);
+        return getQueue().resend(entry, this);
     }
 
     final SubFlushRunner getRunner()
@@ -315,31 +315,26 @@ class QueueConsumer<T extends ConsumerTa
         return _id;
     }
 
-    public final StateChangeListener<? extends Consumer, State> getStateListener()
+    public final StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> getStateListener()
     {
         return _stateListener;
     }
 
-    public final void setStateListener(StateChangeListener<? extends Consumer, State> listener)
+    public final void setStateListener(StateChangeListener<? super QueueConsumer<T,E,Q,L>, State> listener)
     {
         _stateListener = listener;
     }
 
-    final QueueContext getQueueContext()
+    final QueueContext<E,Q,L> getQueueContext()
     {
         return _queueContext;
     }
 
-    final void setQueueContext(QueueContext queueContext)
+    final void setQueueContext(QueueContext<E,Q,L> queueContext)
     {
         _queueContext = queueContext;
     }
 
-    protected boolean updateState(State from, State to)
-    {
-        return _state.compareAndSet(from, to);
-    }
-
     public final boolean isActive()
     {
         return getState() == State.ACTIVE;
@@ -355,7 +350,7 @@ class QueueConsumer<T extends ConsumerTa
         _noLocal = noLocal;
     }
 
-    public final boolean hasInterest(MessageInstance entry)
+    public final boolean hasInterest(E entry)
     {
        //check that the message hasn't been rejected
         if (entry.isRejectedBy(this))
@@ -405,7 +400,6 @@ class QueueConsumer<T extends ConsumerTa
                 filterLogString.append(delimiter);
             }
             filterLogString.append("Browser");
-            hasEntries = true;
         }
 
         return filterLogString.toString();
@@ -431,7 +425,7 @@ class QueueConsumer<T extends ConsumerTa
         return _createTime;
     }
 
-    final MessageInstance.ConsumerAcquiredState getOwningState()
+    final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
     {
         return _owningState;
     }
@@ -466,7 +460,7 @@ class QueueConsumer<T extends ConsumerTa
         return _deliveredCount.longValue();
     }
 
-    final void send(final QueueEntry entry, final boolean batch) throws AMQException
+    final void send(final E entry, final boolean batch) throws AMQException
     {
         _deliveredCount.incrementAndGet();
         ServerMessage message = entry.getMessage();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java Sat Feb  8 17:52:05 2014
@@ -24,19 +24,19 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-class QueueConsumerList
+class QueueConsumerList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
 {
-    private final ConsumerNode _head = new ConsumerNode();
+    private final ConsumerNode<E,Q,L> _head = new ConsumerNode<E,Q,L>();
 
-    private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head);
-    private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head);
+    private final AtomicReference<ConsumerNode<E,Q,L>> _tail = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
+    private final AtomicReference<ConsumerNode<E,Q,L>> _subNodeMarker = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
     private final AtomicInteger _size = new AtomicInteger();
 
-    public static final class ConsumerNode
+    public static final class ConsumerNode<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
     {
         private final AtomicBoolean _deleted = new AtomicBoolean();
-        private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>();
-        private final QueueConsumer _sub;
+        private final AtomicReference<ConsumerNode<E,Q,L>> _next = new AtomicReference<ConsumerNode<E,Q,L>>();
+        private final QueueConsumer<?,E,Q,L> _sub;
 
         public ConsumerNode()
         {
@@ -45,7 +45,7 @@ class QueueConsumerList
             _deleted.set(true);
         }
 
-        public ConsumerNode(final QueueConsumer sub)
+        public ConsumerNode(final QueueConsumer<?,E,Q,L> sub)
         {
             //used for regular node construction
             _sub = sub;
@@ -57,12 +57,12 @@ class QueueConsumerList
          *
          * @return the next non-deleted node, or null if none was found.
          */
-        public ConsumerNode findNext()
+        public ConsumerNode<E,Q,L> findNext()
         {
-            ConsumerNode next = nextNode();
+            ConsumerNode<E,Q,L> next = nextNode();
             while(next != null && next.isDeleted())
             {
-                final ConsumerNode newNext = next.nextNode();
+                final ConsumerNode<E,Q,L> newNext = next.nextNode();
                 if(newNext != null)
                 {
                     //try to move our _next reference forward to the 'newNext'
@@ -86,7 +86,7 @@ class QueueConsumerList
          *
          * @return the immediately next node in the structure, or null if at the tail.
          */
-        protected ConsumerNode nextNode()
+        protected ConsumerNode<E,Q,L> nextNode()
         {
             return _next.get();
         }
@@ -97,7 +97,7 @@ class QueueConsumerList
          * @param node the ConsumerNode to set as 'next'
          * @return whether the operation succeeded
          */
-        private boolean setNext(final ConsumerNode node)
+        private boolean setNext(final ConsumerNode<E,Q,L> node)
         {
             return _next.compareAndSet(null, node);
         }
@@ -112,18 +112,18 @@ class QueueConsumerList
             return _deleted.compareAndSet(false,true);
         }
 
-        public QueueConsumer getConsumer()
+        public QueueConsumer<?,E,Q,L> getConsumer()
         {
             return _sub;
         }
     }
 
-    private void insert(final ConsumerNode node, final boolean count)
+    private void insert(final ConsumerNode<E,Q,L> node, final boolean count)
     {
         for (;;)
         {
-            ConsumerNode tail = _tail.get();
-            ConsumerNode next = tail.nextNode();
+            ConsumerNode<E,Q,L> tail = _tail.get();
+            ConsumerNode<E,Q,L> next = tail.nextNode();
             if (tail == _tail.get())
             {
                 if (next == null)
@@ -146,16 +146,16 @@ class QueueConsumerList
         }
     }
 
-    public void add(final QueueConsumer sub)
+    public void add(final QueueConsumer<?,E,Q,L> sub)
     {
-        ConsumerNode node = new ConsumerNode(sub);
+        ConsumerNode<E,Q,L> node = new ConsumerNode<E,Q,L>(sub);
         insert(node, true);
     }
 
-    public boolean remove(final QueueConsumer sub)
+    public boolean remove(final QueueConsumer<?, E,Q,L> sub)
     {
-        ConsumerNode prevNode = _head;
-        ConsumerNode node = _head.nextNode();
+        ConsumerNode<E,Q,L> prevNode = _head;
+        ConsumerNode<E,Q,L> node = _head.nextNode();
 
         while(node != null)
         {
@@ -170,7 +170,7 @@ class QueueConsumerList
                     //correctness reasons, however we have just 'deleted'
                     //the tail. Inserting an empty dummy node after it will
                     //let us scavenge the node containing the Consumer.
-                    insert(new ConsumerNode(), false);
+                    insert(new ConsumerNode<E,Q,L>(), false);
                 }
 
                 //advance the next node reference in the 'prevNode' to scavenge
@@ -189,9 +189,9 @@ class QueueConsumerList
         return false;
     }
 
-    private void nodeMarkerCleanup(final ConsumerNode node)
+    private void nodeMarkerCleanup(final ConsumerNode<E,Q,L> node)
     {
-        ConsumerNode markedNode = _subNodeMarker.get();
+        ConsumerNode<E,Q,L> markedNode = _subNodeMarker.get();
         if(node == markedNode)
         {
             //if the marked node is the one we are removing, then
@@ -200,7 +200,7 @@ class QueueConsumerList
             //into the list and find the next node to use.
             //Because we inserted a dummy if node was the
             //tail, markedNode.nextNode() can never be null.
-            ConsumerNode dummy = new ConsumerNode();
+            ConsumerNode<E,Q,L> dummy = new ConsumerNode<E,Q,L>();
             dummy.setNext(markedNode.nextNode());
 
             //if the CAS fails the marked node has changed, thus
@@ -219,7 +219,7 @@ class QueueConsumerList
         }
     }
 
-    public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode)
+    public boolean updateMarkedNode(final ConsumerNode<E,Q,L> expected, final ConsumerNode<E,Q,L> nextNode)
     {
         return _subNodeMarker.compareAndSet(expected, nextNode);
     }
@@ -231,41 +231,41 @@ class QueueConsumerList
      *
      * @return the previously marked node (or a dummy if it was subsequently deleted)
      */
-    public ConsumerNode getMarkedNode()
+    public ConsumerNode<E,Q,L> getMarkedNode()
     {
         return _subNodeMarker.get();
     }
 
 
-    public static class ConsumerNodeIterator
+    public static class ConsumerNodeIterator<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
     {
-        private ConsumerNode _lastNode;
+        private ConsumerNode<E,Q,L> _lastNode;
 
-        ConsumerNodeIterator(ConsumerNode startNode)
+        ConsumerNodeIterator(ConsumerNode<E,Q,L> startNode)
         {
             _lastNode = startNode;
         }
 
-        public ConsumerNode getNode()
+        public ConsumerNode<E,Q,L> getNode()
         {
             return _lastNode;
         }
 
         public boolean advance()
         {
-            ConsumerNode nextNode = _lastNode.findNext();
+            ConsumerNode<E,Q,L> nextNode = _lastNode.findNext();
             _lastNode = nextNode;
 
             return _lastNode != null;
         }
     }
 
-    public ConsumerNodeIterator iterator()
+    public ConsumerNodeIterator<E,Q,L> iterator()
     {
-        return new ConsumerNodeIterator(_head);
+        return new ConsumerNodeIterator<E,Q,L>(_head);
     }
 
-    public ConsumerNode getHead()
+    public ConsumerNode<E,Q,L> getHead()
     {
         return _head;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Sat Feb  8 17:52:05 2014
@@ -23,32 +23,32 @@ package org.apache.qpid.server.queue;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-final class QueueContext
+final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
 {
-    private volatile QueueEntry _lastSeenEntry;
-    private volatile QueueEntry _releasedEntry;
+    private volatile E _lastSeenEntry;
+    private volatile E _releasedEntry;
 
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
             _lastSeenUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+        (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry");
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
             _releasedUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_releasedEntry");
+        (QueueContext.class, QueueEntryImpl.class, "_releasedEntry");
 
-    public QueueContext(QueueEntry head)
+    public QueueContext(E head)
     {
         _lastSeenEntry = head;
     }
 
-    public QueueEntry getLastSeenEntry()
+    public E getLastSeenEntry()
     {
         return _lastSeenEntry;
     }
 
 
-    QueueEntry getReleasedEntry()
+    E getReleasedEntry()
     {
         return _releasedEntry;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sat Feb  8 17:52:05 2014
@@ -20,20 +20,21 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.MessageInstance;
 
-public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry>
+public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E>
 {
 
-    AMQQueue<QueueConsumer> getQueue();
+    Q getQueue();
 
     long getSize();
 
     boolean isQueueDeleted();
 
-    QueueEntry getNextNode();
+    E getNextNode();
 
-    QueueEntry getNextValidEntry();
+    E getNextValidEntry();
 
 
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sat Feb  8 17:52:05 2014
@@ -44,11 +44,11 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public abstract class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>>
 {
     private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
 
-    private final QueueEntryList _queueEntryList;
+    private final L _queueEntryList;
 
     private final MessageReference _message;
 
@@ -63,7 +63,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> _stateChangeListeners;
+    private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -90,14 +90,14 @@ public abstract class QueueEntryImpl imp
     private boolean _deliveredToConsumer;
 
 
-    public QueueEntryImpl(QueueEntryList<?> queueEntryList)
+    public QueueEntryImpl(L queueEntryList)
     {
         this(queueEntryList,null,Long.MIN_VALUE);
         _state = DELETED_STATE;
     }
 
 
-    public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
+    public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId)
     {
         _queueEntryList = queueEntryList;
 
@@ -107,7 +107,7 @@ public abstract class QueueEntryImpl imp
         populateInstanceProperties();
     }
 
-    public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
+    public QueueEntryImpl(L queueEntryList, ServerMessage message)
     {
         _queueEntryList = queueEntryList;
         _message = message == null ? null :  message.newReference();
@@ -138,7 +138,7 @@ public abstract class QueueEntryImpl imp
         return _entryId;
     }
 
-    public AMQQueue<QueueConsumer> getQueue()
+    public Q getQueue()
     {
         return _queueEntryList.getQueue();
     }
@@ -234,12 +234,12 @@ public abstract class QueueEntryImpl imp
 
             if(state instanceof ConsumerAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount(this);
+                getQueue().decrementUnackedMsgCount((E) this);
             }
 
             if(!getQueue().isDeleted())
             {
-                getQueue().requeue(this);
+                getQueue().requeue((E)this);
                 if(_stateChangeListeners != null)
                 {
                     notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
@@ -315,13 +315,12 @@ public abstract class QueueEntryImpl imp
 
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
         {
-            Consumer s = null;
             if (state instanceof ConsumerAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount(this);
+                getQueue().decrementUnackedMsgCount((E) this);
             }
 
-            getQueue().dequeue(this,s);
+            getQueue().dequeue((E)this);
             if(_stateChangeListeners != null)
             {
                 notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -333,9 +332,9 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener<MessageInstance<QueueConsumer>, State> l : _stateChangeListeners)
+        for(StateChangeListener<? super E, State> l : _stateChangeListeners)
         {
-            l.stateChanged(this, oldState, newState);
+            l.stateChanged((E)this, oldState, newState);
         }
     }
 
@@ -345,7 +344,7 @@ public abstract class QueueEntryImpl imp
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
-            _queueEntryList.entryDeleted(this);
+            _queueEntryList.entryDeleted((E)this);
             onDelete();
             _message.release();
 
@@ -364,7 +363,7 @@ public abstract class QueueEntryImpl imp
         dispose();
     }
 
-    public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn)
+    public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn)
     {
         final AMQQueue currentQueue = getQueue();
         Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -412,21 +411,21 @@ public abstract class QueueEntryImpl imp
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
+    public void addStateChangeListener(StateChangeListener<? super E,State> listener)
     {
-        Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
         if(listeners == null)
         {
-            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance<QueueConsumer>, State>>());
+            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>());
             listeners = _stateChangeListeners;
         }
 
         listeners.add(listener);
     }
 
-    public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener)
     {
-        Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
         if(listeners != null)
         {
             return listeners.remove(listener);
@@ -436,9 +435,9 @@ public abstract class QueueEntryImpl imp
     }
 
 
-    public int compareTo(final QueueEntry o)
+    public int compareTo(final E o)
     {
-        QueueEntryImpl other = (QueueEntryImpl)o;
+        E other = o;
         return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
     }
 
@@ -446,7 +445,7 @@ public abstract class QueueEntryImpl imp
     {
     }
 
-    public QueueEntryList getQueueEntryList()
+    public L getQueueEntryList()
     {
         return _queueEntryList;
     }
@@ -494,10 +493,10 @@ public abstract class QueueEntryImpl imp
     @Override
     public boolean resend() throws AMQException
     {
-        QueueConsumer sub = getDeliveredConsumer();
+        QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer();
         if(sub != null)
         {
-            return sub.resend(this);
+            return sub.resend((E)this);
         }
         return false;
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Sat Feb  8 17:52:05 2014
@@ -20,11 +20,13 @@
 */
 package org.apache.qpid.server.queue;
 
-public interface QueueEntryIterator<QE extends QueueEntry>
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
 {
     boolean atTail();
 
-    QE getNode();
+    E getNode();
 
     boolean advance();
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sat Feb  8 17:52:05 2014
@@ -20,21 +20,23 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.ServerMessage;
 
-public interface QueueEntryList<Q extends QueueEntry>
+public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
 {
-    AMQQueue<QueueConsumer> getQueue();
+    Q getQueue();
 
-    Q add(ServerMessage message);
+    E add(ServerMessage message);
 
-    Q next(Q node);
+    E next(E node);
 
-    QueueEntryIterator<Q> iterator();
+    QueueEntryIterator<E,Q,L,C> iterator();
 
-    Q getHead();
+    E getHead();
 
-    void entryDeleted(Q queueEntry);
+    void entryDeleted(E queueEntry);
     
     int getPriorities();
+
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Sat Feb  8 17:52:05 2014
@@ -20,7 +20,7 @@
 */
 package org.apache.qpid.server.queue;
 
-interface QueueEntryListFactory
+interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
 {
-    public QueueEntryList createQueueEntryList(AMQQueue queue);
+    public L createQueueEntryList(Q queue);
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Sat Feb  8 17:52:05 2014
@@ -16,7 +16,9 @@
 */
 package org.apache.qpid.server.queue;
 
-public interface QueueEntryVisitor
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryVisitor<E extends QueueEntry>
 {
-    boolean visit(QueueEntry entry);
+    boolean visit(E entry);
 }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sat Feb  8 17:52:05 2014
@@ -43,7 +43,6 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -52,6 +51,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -59,9 +59,9 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
-                                       StateChangeListener<QueueConsumer, QueueConsumer.State>,
-                                       MessageGroupManager.ConsumerResetHelper
+abstract class SimpleAMQQueue<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E, Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
+                                       StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
+                                       MessageGroupManager.ConsumerResetHelper<E,Q,L>
 {
 
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -94,11 +94,11 @@ public class SimpleAMQQueue implements A
     private Exchange _alternateExchange;
 
 
-    private final QueueEntryList<QueueEntry> _entries;
+    private final L _entries;
 
-    private final QueueConsumerList _consumerList = new QueueConsumerList();
+    private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
 
-    private volatile QueueConsumer _exclusiveSubscriber;
+    private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
 
 
 
@@ -163,8 +163,7 @@ public class SimpleAMQQueue implements A
     private LogSubject _logSubject;
     private LogActor _logActor;
 
-    private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
-    private boolean _nolocal;
+    private boolean _noLocal;
 
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
     private boolean _deleteOnNoConsumers;
@@ -177,20 +176,15 @@ public class SimpleAMQQueue implements A
 
     /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
     private int _maximumDeliveryCount;
-    private final MessageGroupManager _messageGroupManager;
+    private final MessageGroupManager<E,Q,L> _messageGroupManager;
 
-    private final Collection<ConsumerRegistrationListener> _consumerListeners =
-            new ArrayList<ConsumerRegistrationListener>();
+    private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
+            new ArrayList<ConsumerRegistrationListener<Q>>();
 
     private AMQQueue.NotificationListener _notificationListener;
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
 
 
-    public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
-    {
-        this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
-    }
-
     protected SimpleAMQQueue(UUID id,
                              String name,
                              boolean durable,
@@ -198,7 +192,7 @@ public class SimpleAMQQueue implements A
                              boolean autoDelete,
                              boolean exclusive,
                              VirtualHost virtualHost,
-                             QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
+                             QueueEntryListFactory<E,Q,L> entryListFactory, Map<String,Object> arguments)
     {
 
         if (name == null)
@@ -217,7 +211,7 @@ public class SimpleAMQQueue implements A
         _autoDelete = autoDelete;
         _exclusive = exclusive;
         _virtualHost = virtualHost;
-        _entries = entryListFactory.createQueueEntryList(this);
+        _entries = entryListFactory.createQueueEntryList((Q)this);
         _arguments = Collections.synchronizedMap(arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments));
 
         _id = id;
@@ -243,13 +237,13 @@ public class SimpleAMQQueue implements A
             {
                 Object defaultGroup = arguments.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
                 _messageGroupManager =
-                        new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
+                        new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(Queue.MESSAGE_GROUP_KEY)),
                                 defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
                                 this);
             }
             else
             {
-                _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
+                _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(arguments.get(
                         Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
             }
         }
@@ -281,21 +275,20 @@ public class SimpleAMQQueue implements A
         }
         catch (RejectedExecutionException ree)
         {
-            if (_stopped.get())
-            {
-                // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
-            }
-            else
+            // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+            if(!_stopped.get())
             {
                 _logger.error("Unexpected rejected execution", ree);
                 throw ree;
+
             }
+
         }
     }
 
     public void setNoLocal(boolean nolocal)
     {
-        _nolocal = nolocal;
+        _noLocal = nolocal;
     }
 
     public UUID getId()
@@ -384,7 +377,7 @@ public class SimpleAMQQueue implements A
 
 
     @Override
-    public synchronized QueueConsumer addConsumer(final ConsumerTarget target,
+    public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
                                      final FilterManager filters,
                                      final Class<? extends ServerMessage> messageClass,
                                      final String consumerName,
@@ -412,10 +405,10 @@ public class SimpleAMQQueue implements A
             throw new ExistingConsumerPreventsExclusive();
         }
 
-        QueueConsumer consumer = new QueueConsumer(filters, messageClass,
-                                                                optionSet.contains(Consumer.Option.ACQUIRES),
-                                                                optionSet.contains(Consumer.Option.SEES_REQUEUES),
-                                                                consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+        QueueConsumer<T,E,Q,L> consumer = new QueueConsumer<T,E,Q,L>(filters, messageClass,
+                                                         optionSet.contains(Consumer.Option.ACQUIRES),
+                                                         optionSet.contains(Consumer.Option.SEES_REQUEUES),
+                                                         consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
         target.consumerAdded(consumer);
 
 
@@ -430,21 +423,21 @@ public class SimpleAMQQueue implements A
         }
 
         consumer.setStateListener(this);
-        consumer.setQueueContext(new QueueContext(_entries.getHead()));
+        consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
 
         if (!isDeleted())
         {
-            consumer.setQueue(this, exclusive);
-            if(_nolocal)
+            consumer.setQueue((Q)this, exclusive);
+            if(_noLocal)
             {
-                consumer.setNoLocal(_nolocal);
+                consumer.setNoLocal(true);
             }
 
             synchronized (_consumerListeners)
             {
-                for(ConsumerRegistrationListener listener : _consumerListeners)
+                for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
                 {
-                    listener.consumerAdded(this, consumer);
+                    listener.consumerAdded((Q)this, consumer);
                 }
             }
 
@@ -466,7 +459,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
+    synchronized void unregisterConsumer(final QueueConsumer<?,E,Q,L> consumer) throws AMQException
     {
         if (consumer == null)
         {
@@ -494,9 +487,9 @@ public class SimpleAMQQueue implements A
 
             synchronized (_consumerListeners)
             {
-                for(ConsumerRegistrationListener listener : _consumerListeners)
+                for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
                 {
-                    listener.consumerRemoved(this, consumer);
+                    listener.consumerRemoved((Q)this, consumer);
                 }
             }
 
@@ -519,10 +512,10 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public Collection<QueueConsumer> getConsumers()
+    public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
     {
-        List<QueueConsumer> consumers = new ArrayList<QueueConsumer>();
-        QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
+        List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
+        QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
         while(iter.advance())
         {
             consumers.add(iter.getNode().getConsumer());
@@ -531,7 +524,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
     {
         synchronized (_consumerListeners)
         {
@@ -539,7 +532,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
+    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
     {
         synchronized (_consumerListeners)
         {
@@ -547,9 +540,9 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
+    public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
     {
-        QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
+        E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
         if(clearAssignments)
         {
             _messageGroupManager.clearAssignments(consumer);
@@ -557,11 +550,11 @@ public class SimpleAMQQueue implements A
 
         if(entry != null)
         {
-            QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+            QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
             // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
             while (subscriberIter.advance())
             {
-                QueueConsumer sub = subscriberIter.getNode().getConsumer();
+                QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
 
                 // we don't make browsers send the same stuff twice
                 if (sub.seesRequeues())
@@ -599,11 +592,6 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public int getBindingCountHigh()
-    {
-        return _bindingCountHigh.get();
-    }
-
     public void removeBinding(final Binding binding)
     {
         _bindings.remove(binding);
@@ -626,7 +614,7 @@ public class SimpleAMQQueue implements A
 
     // ------ Enqueue / Dequeue
 
-    public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
+    public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action) throws AMQException
     {
         incrementQueueCount();
         incrementQueueSize(message);
@@ -634,8 +622,8 @@ public class SimpleAMQQueue implements A
         _totalMessagesReceived.incrementAndGet();
 
 
-        QueueEntry entry;
-        final QueueConsumer exclusiveSub = _exclusiveSubscriber;
+        E entry;
+        final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
         entry = _entries.add(message);
 
         if(action != null || (exclusiveSub == null  && _queueRunner.isIdle()))
@@ -645,8 +633,8 @@ public class SimpleAMQQueue implements A
             iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
 
              */
-            QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
-            QueueConsumerList.ConsumerNode nextNode = node.findNext();
+            QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
+            QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
             if (nextNode == null)
             {
                 nextNode = _consumerList.getHead().findNext();
@@ -682,7 +670,7 @@ public class SimpleAMQQueue implements A
                 else
                 {
                     // if consumer at end, and active, offer
-                    QueueConsumer sub = nextNode.getConsumer();
+                    QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
                     deliverToConsumer(sub, entry);
                 }
                 nextNode = nextNode.findNext();
@@ -714,7 +702,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
+    private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
             throws AMQException
     {
 
@@ -746,7 +734,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private boolean assign(final QueueConsumer sub, final QueueEntry entry)
+    private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
         if(_messageGroupManager == null)
         {
@@ -760,7 +748,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
+    private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
         if(_messageGroupManager == null || !sub.acquires())
         {
@@ -770,7 +758,7 @@ public class SimpleAMQQueue implements A
         return (assigned == null) || (assigned == sub);
     }
 
-    protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
+    protected void checkConsumersNotAheadOfDelivery(final E entry)
     {
         // This method is only required for queues which mess with ordering
         // Simple Queues don't :-)
@@ -804,7 +792,7 @@ public class SimpleAMQQueue implements A
         getAtomicQueueCount().incrementAndGet();
     }
 
-    private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
+    private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
             throws AMQException
     {
         setLastSeenEntry(sub, entry);
@@ -815,18 +803,18 @@ public class SimpleAMQQueue implements A
         sub.send(entry, batch);
     }
 
-    private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
+    private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry) throws AMQException
     {
         return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
     }
 
 
-    private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
+    private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
-        QueueContext subContext = sub.getQueueContext();
+        QueueContext<E,Q,L> subContext = sub.getQueueContext();
         if (subContext != null)
         {
-            QueueEntry releasedEntry = subContext.getReleasedEntry();
+            E releasedEntry = subContext.getReleasedEntry();
 
             QueueContext._lastSeenUpdater.set(subContext, entry);
             if(releasedEntry == entry)
@@ -836,13 +824,13 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
+    private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
     {
 
-        QueueContext subContext = sub.getQueueContext();
+        QueueContext<E,Q,L> subContext = sub.getQueueContext();
         if(subContext != null)
         {
-            QueueEntry oldEntry;
+            E oldEntry;
 
             while((oldEntry  = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
             {
@@ -854,13 +842,13 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public void requeue(QueueEntry entry)
+    public void requeue(E entry)
     {
-        QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
+        QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
         while (subscriberIter.advance() && entry.isAvailable())
         {
-            QueueConsumer sub = subscriberIter.getNode().getConsumer();
+            QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
 
             // we don't make browsers send the same stuff twice
             if (sub.seesRequeues())
@@ -873,7 +861,8 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void dequeue(QueueEntry entry, Consumer sub)
+    @Override
+    public void dequeue(E entry)
     {
         decrementQueueCount();
         decrementQueueSize(entry);
@@ -886,7 +875,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    private void decrementQueueSize(final QueueEntry entry)
+    private void decrementQueueSize(final E entry)
     {
         final ServerMessage message = entry.getMessage();
         long size = message.getSize();
@@ -905,7 +894,7 @@ public class SimpleAMQQueue implements A
         _dequeueCount.incrementAndGet();
     }
 
-    public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
+    public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer) throws AMQException
     {
         /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
                   entry to resend and move back the consumer pointer. */
@@ -915,7 +904,7 @@ public class SimpleAMQQueue implements A
         {
             if (!consumer.isClosed())
             {
-                deliverMessage((QueueConsumer) consumer, entry, false);
+                deliverMessage(consumer, entry, false);
                 return true;
             }
             else
@@ -981,11 +970,11 @@ public class SimpleAMQQueue implements A
 
     public long getOldestMessageArrivalTime()
     {
-        QueueEntry entry = getOldestQueueEntry();
+        E entry = getOldestQueueEntry();
         return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
     }
 
-    protected QueueEntry getOldestQueueEntry()
+    protected E getOldestQueueEntry()
     {
         return _entries.next(_entries.getHead());
     }
@@ -995,13 +984,13 @@ public class SimpleAMQQueue implements A
         return _deleted.get();
     }
 
-    public List<QueueEntry> getMessagesOnTheQueue()
+    public List<E> getMessagesOnTheQueue()
     {
-        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        ArrayList<E> entryList = new ArrayList<E>();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (node != null && !node.isDeleted())
             {
                 entryList.add(node);
@@ -1011,7 +1000,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState)
+    public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
     {
         if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
         {
@@ -1029,7 +1018,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public int compareTo(final AMQQueue o)
+    public int compareTo(final Q o)
     {
         return _name.compareTo(o.getName());
     }
@@ -1049,7 +1038,7 @@ public class SimpleAMQQueue implements A
         return _exclusiveSubscriber != null;
     }
 
-    private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
+    private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
     {
         _exclusiveSubscriber = exclusiveSubscriber;
     }
@@ -1060,32 +1049,32 @@ public class SimpleAMQQueue implements A
     }
 
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
-    protected QueueEntryList getEntries()
+    protected L getEntries()
     {
         return _entries;
     }
 
-    protected QueueConsumerList getConsumerList()
+    protected QueueConsumerList<E,Q,L> getConsumerList()
     {
         return _consumerList;
     }
 
 
-    public static interface QueueEntryFilter
+    public static interface QueueEntryFilter<E extends QueueEntry>
     {
-        public boolean accept(QueueEntry entry);
+        public boolean accept(E entry);
 
         public boolean filterComplete();
     }
 
 
 
-    public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+    public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
     {
-        return getMessagesOnTheQueue(new QueueEntryFilter()
+        return getMessagesOnTheQueue(new QueueEntryFilter<E>()
         {
 
-            public boolean accept(QueueEntry entry)
+            public boolean accept(E entry)
             {
                 final long messageId = entry.getMessage().getMessageNumber();
                 return messageId >= fromMessageId && messageId <= toMessageId;
@@ -1098,13 +1087,13 @@ public class SimpleAMQQueue implements A
         });
     }
 
-    public QueueEntry getMessageOnTheQueue(final long messageId)
+    public E getMessageOnTheQueue(final long messageId)
     {
-        List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+        List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
         {
             private boolean _complete;
 
-            public boolean accept(QueueEntry entry)
+            public boolean accept(E entry)
             {
                 _complete = entry.getMessage().getMessageNumber() == messageId;
                 return _complete;
@@ -1118,13 +1107,13 @@ public class SimpleAMQQueue implements A
         return entries.isEmpty() ? null : entries.get(0);
     }
 
-    public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
+    public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
     {
-        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        ArrayList<E> entryList = new ArrayList<E>();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         while (queueListIterator.advance() && !filter.filterComplete())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (!node.isDeleted() && filter.accept(node))
             {
                 entryList.add(node);
@@ -1134,13 +1123,13 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void visit(final QueueEntryVisitor visitor)
+    public void visit(final QueueEntryVisitor<E> visitor)
     {
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
 
         while(queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
 
             if(!node.isDeleted())
             {
@@ -1157,17 +1146,17 @@ public class SimpleAMQQueue implements A
      *
      * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
      * Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
-     * @param fromPosition
-     * @param toPosition
-     * @return
+     * @param fromPosition first message position
+     * @param toPosition last message position
+     * @return list of messages
      */
-    public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+    public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
     {
-        return getMessagesOnTheQueue(new QueueEntryFilter()
+        return getMessagesOnTheQueue(new QueueEntryFilter<E>()
                                         {
                                             private long position = 0;
 
-                                            public boolean accept(QueueEntry entry)
+                                            public boolean accept(E entry)
                                             {
                                                 position++;
                                                 return (position >= fromPosition) && (position <= toPosition);
@@ -1196,12 +1185,12 @@ public class SimpleAMQQueue implements A
     // TODO - now only used by the tests
     public void deleteMessageFromTop()
     {
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         boolean noDeletes = true;
 
         while (noDeletes && queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (node.acquire())
             {
                 dequeueEntry(node);
@@ -1224,14 +1213,14 @@ public class SimpleAMQQueue implements A
             throw new AMQSecurityException("Permission denied: queue " + getName());
         }
 
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
         long count = 0;
 
         ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             if (node.acquire())
             {
                 dequeueEntry(node, txn);
@@ -1248,13 +1237,13 @@ public class SimpleAMQQueue implements A
         return count;
     }
 
-    private void dequeueEntry(final QueueEntry node)
+    private void dequeueEntry(final E node)
     {
         ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
         dequeueEntry(node, txn);
     }
 
-    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+    private void dequeueEntry(final E node, ServerTransaction txn)
     {
         txn.dequeue(this, node.getMessage(),
                     new ServerTransaction.Action()
@@ -1283,7 +1272,7 @@ public class SimpleAMQQueue implements A
     }
 
     // TODO list all thrown exceptions
-    public int delete() throws AMQSecurityException, AMQException
+    public int delete() throws AMQException
     {
         // Check access
         if (!_virtualHost.getSecurityManager().authoriseDelete(this))
@@ -1313,10 +1302,10 @@ public class SimpleAMQQueue implements A
             }
 
 
-            List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+            List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
             {
 
-                public boolean accept(QueueEntry entry)
+                public boolean accept(E entry)
                 {
                     return entry.acquire();
                 }
@@ -1330,7 +1319,7 @@ public class SimpleAMQQueue implements A
             ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
 
-            for(final QueueEntry entry : entries)
+            for(final E entry : entries)
             {
                 // TODO log requeues with a post enqueue action
                 int requeues = entry.routeToAlternate(null, txn);
@@ -1435,7 +1424,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    public void deliverAsync(QueueConsumer sub)
+    public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
     {
         if(_exclusiveSubscriber == null)
         {
@@ -1449,7 +1438,7 @@ public class SimpleAMQQueue implements A
 
     }
 
-    void flushConsumer(QueueConsumer sub) throws AMQException
+    void flushConsumer(QueueConsumer<?,E,Q,L> sub) throws AMQException
     {
         // Access control
         if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
@@ -1459,7 +1448,7 @@ public class SimpleAMQQueue implements A
         flushConsumer(sub, Long.MAX_VALUE);
     }
 
-    boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException
+    boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations) throws AMQException
     {
         boolean atTail = false;
         final boolean keepSendLockHeld = iterations <=  SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1480,8 +1469,8 @@ public class SimpleAMQQueue implements A
                         sub.getSendLock();
                     }
 
-                    atTail = attemptDelivery((QueueConsumer)sub, true);
-                    if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
+                    atTail = attemptDelivery(sub, true);
+                    if (atTail && getNextAvailableEntry(sub) == null)
                     {
                         queueEmpty = true;
                     }
@@ -1532,12 +1521,12 @@ public class SimpleAMQQueue implements A
      * Looks up the next node for the consumer and attempts to deliver it.
      *
      *
-     * @param sub
-     * @param batch
+     * @param sub the consumer
+     * @param batch true if processing can be batched
      * @return true if we have completed all possible deliveries for this sub.
      * @throws AMQException
      */
-    private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
+    private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch) throws AMQException
     {
         boolean atTail = false;
 
@@ -1545,7 +1534,7 @@ public class SimpleAMQQueue implements A
         if (subActive)
         {
 
-            QueueEntry node  = getNextAvailableEntry(sub);
+            E node  = getNextAvailableEntry(sub);
 
             if (node != null && node.isAvailable())
             {
@@ -1582,11 +1571,11 @@ public class SimpleAMQQueue implements A
 
     protected void advanceAllConsumers() throws AMQException
     {
-        QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+        QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
         while (consumerNodeIterator.advance())
         {
-            QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
-            QueueConsumer sub = subNode.getConsumer();
+            QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
+            QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
             if(sub.acquires())
             {
                 getNextAvailableEntry(sub);
@@ -1598,16 +1587,16 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
+    private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
             throws AMQException
     {
-        QueueContext context = sub.getQueueContext();
+        QueueContext<E,Q,L> context = sub.getQueueContext();
         if(context != null)
         {
-            QueueEntry lastSeen = context.getLastSeenEntry();
-            QueueEntry releasedNode = context.getReleasedEntry();
+            E lastSeen = context.getLastSeenEntry();
+            E releasedNode = context.getReleasedEntry();
 
-            QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+            E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
             while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1639,12 +1628,12 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
+    public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
     {
-        QueueContext context = sub.getQueueContext();
+        QueueContext<E,Q,L> context = sub.getQueueContext();
         if(context != null)
         {
-            QueueEntry releasedNode = context.getReleasedEntry();
+            E releasedNode = context.getReleasedEntry();
             return releasedNode != null && releasedNode.compareTo(entry) < 0;
         }
         else
@@ -1681,7 +1670,7 @@ public class SimpleAMQQueue implements A
      */
     public long processQueue(QueueRunner runner) throws AMQException
     {
-        long stateChangeCount = Long.MIN_VALUE;
+        long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         long rVal = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
@@ -1716,11 +1705,11 @@ public class SimpleAMQQueue implements A
             boolean allConsumersDone = true;
             boolean consumerDone;
 
-            QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+            QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
             //iterate over the subscribers and try to advance their pointer
             while (consumerNodeIterator.advance())
             {
-                QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
+                QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
                 sub.getSendLock();
 
                     try
@@ -1802,11 +1791,11 @@ public class SimpleAMQQueue implements A
 
     public void checkMessageStatus() throws AMQException
     {
-        QueueEntryIterator queueListIterator = _entries.iterator();
+        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
 
         while (queueListIterator.advance())
         {
-            QueueEntry node = queueListIterator.getNode();
+            E node = queueListIterator.getNode();
             // Only process nodes that are not currently deleted and not dequeued
             if (!node.isDeleted())
             {
@@ -1953,12 +1942,12 @@ public class SimpleAMQQueue implements A
         return _notificationChecks;
     }
 
-    private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
+    private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
     {
 
-        private final QueueConsumer _sub;
+        private final QueueConsumer<?,E,Q,L> _sub;
 
-        public QueueEntryListener(final QueueConsumer sub)
+        public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
         {
             _sub = sub;
         }
@@ -1974,7 +1963,7 @@ public class SimpleAMQQueue implements A
             return System.identityHashCode(_sub);
         }
 
-        public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
+        public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
         {
             entry.removeStateChangeListener(this);
             deliverAsync(_sub);
@@ -2082,13 +2071,13 @@ public class SimpleAMQQueue implements A
         return _unackedMsgBytes.get();
     }
 
-    public void decrementUnackedMsgCount(QueueEntry queueEntry)
+    public void decrementUnackedMsgCount(E queueEntry)
     {
         _unackedMsgCount.decrementAndGet();
         _unackedMsgBytes.addAndGet(-queueEntry.getSize());
     }
 
-    private void incrementUnackedMsgCount(QueueEntry entry)
+    private void incrementUnackedMsgCount(E entry)
     {
         _unackedMsgCount.incrementAndGet();
         _unackedMsgBytes.addAndGet(entry.getSize());
@@ -2159,10 +2148,10 @@ public class SimpleAMQQueue implements A
         return (String) _arguments.get(Queue.DESCRIPTION);
     }
 
-    public final int send(final ServerMessage message,
+    public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
                               final InstanceProperties instanceProperties,
                               final ServerTransaction txn,
-                              final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+                              final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
     {
             txn.enqueue(this,message, new ServerTransaction.Action()
             {

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1566069&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Sat Feb  8 17:52:05 2014
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface SimpleQueueEntryList<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QueueEntryList<E,Q,L,QueueConsumer<?,E,Q,L>>
+{
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1566069&r1=1566068&r2=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Sat Feb  8 17:52:05 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.server.virtualhos
 import java.util.Map;
 import java.util.UUID;
 
-public class SortedQueue extends OutOfOrderQueue
+public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
 {
     //Lock object to synchronize enqueue. Used instead of the object
     //monitor to prevent lock order issues with consumer sendLocks
@@ -41,17 +41,33 @@ public class SortedQueue extends OutOfOr
                             final boolean durable, final String owner, final boolean autoDelete,
                             final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
     {
+        this(id, name, durable, owner, autoDelete, exclusive,
+             virtualHost, arguments, sortedPropertyName, new SortedQueueEntryListFactory(sortedPropertyName));
+    }
+
+
+    protected SortedQueue(UUID id, final String name,
+                          final boolean durable, final String owner, final boolean autoDelete,
+                          final boolean exclusive, final VirtualHost virtualHost,
+                          Map<String, Object> arguments,
+                          String sortedPropertyName,
+                          QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList> factory)
+    {
         super(id, name, durable, owner, autoDelete, exclusive,
-                virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+              virtualHost, factory, arguments);
         this._sortedPropertyName = sortedPropertyName;
     }
 
+
     public String getSortedPropertyName()
     {
         return _sortedPropertyName;
     }
 
-    public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
+    @Override
+    public void enqueue(final ServerMessage message,
+                        final Action<? super MessageInstance<?, QueueConsumer<?, SortedQueueEntry, SortedQueue, SortedQueueEntryList>>> action)
+            throws AMQException
     {
         synchronized (_sortedQueueLock)
         {

Copied: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java (from r1565732, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java?p2=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java&p1=qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java&r1=1565732&r2=1566069&rev=1566069&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java Sat Feb  8 17:52:05 2014
@@ -24,37 +24,37 @@ import org.apache.qpid.server.message.Se
 /**
  * An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
  */
-public class SortedQueueEntryImpl extends QueueEntryImpl
+public class SortedQueueEntry extends QueueEntryImpl<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
 {
     public static enum Colour
     {
         RED, BLACK
     };
 
-    private volatile SortedQueueEntryImpl _next;
-    private SortedQueueEntryImpl _prev;
+    private volatile SortedQueueEntry _next;
+    private SortedQueueEntry _prev;
     private String _key;
 
     private Colour _colour = Colour.BLACK;
-    private SortedQueueEntryImpl _parent;
-    private SortedQueueEntryImpl _left;
-    private SortedQueueEntryImpl _right;
+    private SortedQueueEntry _parent;
+    private SortedQueueEntry _left;
+    private SortedQueueEntry _right;
 
-    public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList)
+    public SortedQueueEntry(final SortedQueueEntryList queueEntryList)
     {
         super(queueEntryList);
     }
 
-    public SortedQueueEntryImpl(final SortedQueueEntryList queueEntryList,
-            final ServerMessage message, final long entryId)
+    public SortedQueueEntry(final SortedQueueEntryList queueEntryList,
+                            final ServerMessage message, final long entryId)
     {
         super(queueEntryList, message, entryId);
     }
 
     @Override
-    public int compareTo(final QueueEntry o)
+    public int compareTo(final SortedQueueEntry o)
     {
-        final String otherKey = ((SortedQueueEntryImpl) o)._key;
+        final String otherKey = o._key;
         final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey);
         return compare == 0 ? super.compareTo(o) : compare;
     }
@@ -69,33 +69,33 @@ public class SortedQueueEntryImpl extend
         return _key;
     }
 
-    public SortedQueueEntryImpl getLeft()
+    public SortedQueueEntry getLeft()
     {
         return _left;
     }
 
-    public SortedQueueEntryImpl getNextNode()
+    public SortedQueueEntry getNextNode()
     {
         return _next;
     }
 
     @Override
-    public SortedQueueEntryImpl getNextValidEntry()
+    public SortedQueueEntry getNextValidEntry()
     {
         return getNextNode();
     }
 
-    public SortedQueueEntryImpl getParent()
+    public SortedQueueEntry getParent()
     {
         return _parent;
     }
 
-    public SortedQueueEntryImpl getPrev()
+    public SortedQueueEntry getPrev()
     {
         return _prev;
     }
 
-    public SortedQueueEntryImpl getRight()
+    public SortedQueueEntry getRight()
     {
         return _right;
     }
@@ -110,27 +110,27 @@ public class SortedQueueEntryImpl extend
         _key = key;
     }
 
-    public void setLeft(final SortedQueueEntryImpl left)
+    public void setLeft(final SortedQueueEntry left)
     {
         _left = left;
     }
 
-    public void setNext(final SortedQueueEntryImpl next)
+    public void setNext(final SortedQueueEntry next)
     {
         _next = next;
     }
 
-    public void setParent(final SortedQueueEntryImpl parent)
+    public void setParent(final SortedQueueEntry parent)
     {
         _parent = parent;
     }
 
-    public void setPrev(final SortedQueueEntryImpl prev)
+    public void setPrev(final SortedQueueEntry prev)
     {
         _prev = prev;
     }
 
-    public void setRight(final SortedQueueEntryImpl right)
+    public void setRight(final SortedQueueEntry right)
     {
         _right = right;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org