You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/27 00:27:43 UTC

svn commit: r1572343 [3/7] - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/binding/ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/...

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Feb 26 23:27:39 2014
@@ -18,6 +18,8 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.lang.reflect.Type;
+import java.security.AccessControlException;
 import java.security.AccessController;
 import java.security.Principal;
 import java.util.*;
@@ -30,14 +32,17 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.exchange.NonDefaultExchange;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -49,11 +54,11 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
@@ -65,14 +70,14 @@ import org.apache.qpid.server.util.Serve
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import javax.management.NotificationListener;
 import javax.security.auth.Subject;
 
-abstract class AbstractQueue<E extends QueueEntryImpl<E,Q,L>,
-                              Q extends AbstractQueue<E, Q,L>,
-                              L extends QueueEntryListBase<E,Q,L>>
-        implements AMQQueue<E, Q, QueueConsumer<?,E,Q,L>>,
-                   StateChangeListener<QueueConsumer<?,E,Q,L>, QueueConsumer.State>,
-                   MessageGroupManager.ConsumerResetHelper<E,Q,L>
+public abstract class AbstractQueue
+        extends AbstractConfiguredObject<AbstractQueue>
+        implements AMQQueue<AbstractQueue>,
+                   StateChangeListener<QueueConsumer<?>, State>,
+                   MessageGroupManager.ConsumerResetHelper
 {
 
     private static final Logger _logger = Logger.getLogger(AbstractQueue.class);
@@ -83,6 +88,16 @@ abstract class AbstractQueue<E extends Q
 
     // TODO - should make this configurable at the vhost / broker level
     private static final int DEFAULT_MAX_GROUPS = 255;
+    private static final QueueNotificationListener NULL_NOTIFICATION_LISTENER = new QueueNotificationListener()
+    {
+        @Override
+        public void notifyClients(final NotificationCheck notification,
+                                  final Queue queue,
+                                  final String notificationMsg)
+        {
+
+        }
+    };
 
     private final VirtualHost _virtualHost;
 
@@ -93,14 +108,14 @@ abstract class AbstractQueue<E extends Q
 
     private final boolean _durable;
 
-    private Exchange _alternateExchange;
+    private NonDefaultExchange _alternateExchange;
 
 
-    private final L _entries;
+    private final QueueEntryList _entries;
 
-    private final QueueConsumerList<E,Q,L> _consumerList = new QueueConsumerList<E,Q,L>();
+    private final QueueConsumerList _consumerList = new QueueConsumerList();
 
-    private volatile QueueConsumer<?,E,Q,L> _exclusiveSubscriber;
+    private volatile QueueConsumer<?> _exclusiveSubscriber;
 
 
 
@@ -163,8 +178,8 @@ abstract class AbstractQueue<E extends Q
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
-    private final List<Action<? super Q>> _deleteTaskList =
-            new CopyOnWriteArrayList<Action<? super Q>>();
+    private final List<Action<? super AMQQueue>> _deleteTaskList =
+            new CopyOnWriteArrayList<Action<? super AMQQueue>>();
 
 
     private LogSubject _logSubject;
@@ -173,7 +188,7 @@ abstract class AbstractQueue<E extends Q
     private boolean _noLocal;
 
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
-    private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+    private final CopyOnWriteArrayList<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
     private UUID _id;
     private final Map<String, Object> _arguments;
 
@@ -182,18 +197,20 @@ abstract class AbstractQueue<E extends Q
 
     /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
     private int _maximumDeliveryCount;
-    private final MessageGroupManager<E,Q,L> _messageGroupManager;
+    private final MessageGroupManager _messageGroupManager;
 
-    private final Collection<ConsumerRegistrationListener<Q>> _consumerListeners =
-            new ArrayList<ConsumerRegistrationListener<Q>>();
+    private final Collection<ConsumerRegistrationListener<? super MessageSource>> _consumerListeners =
+            new ArrayList<ConsumerRegistrationListener<? super MessageSource>>();
 
-    private AMQQueue.NotificationListener _notificationListener;
+    private QueueNotificationListener  _notificationListener;
     private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
 
     protected AbstractQueue(VirtualHost virtualHost,
                             Map<String, Object> attributes,
-                            QueueEntryListFactory<E, Q, L> entryListFactory)
+                            QueueEntryListFactory entryListFactory)
     {
+        super(MapValueConverter.getUUIDAttribute(Queue.ID, attributes),
+              Collections.<String,Object>emptyMap(), attributes, virtualHost.getTaskExecutor());
         if (virtualHost == null)
         {
             throw new IllegalArgumentException("Virtual Host must not be null");
@@ -223,7 +240,7 @@ abstract class AbstractQueue<E extends Q
         _name = name;
         _durable = durable;
         _virtualHost = virtualHost;
-        _entries = entryListFactory.createQueueEntryList((Q) this);
+        _entries = entryListFactory.createQueueEntryList(this);
         final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
 
         arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
@@ -421,13 +438,13 @@ abstract class AbstractQueue<E extends Q
             {
                 Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP);
                 _messageGroupManager =
-                        new DefinedGroupMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
+                        new DefinedGroupMessageGroupManager(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)),
                                 defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(),
                                 this);
             }
             else
             {
-                _messageGroupManager = new AssignedConsumerMessageGroupManager<E,Q,L>(String.valueOf(attributes.get(
+                _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(attributes.get(
                         Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
             }
         }
@@ -499,11 +516,6 @@ abstract class AbstractQueue<E extends Q
         _noLocal = nolocal;
     }
 
-    public UUID getId()
-    {
-        return _id;
-    }
-
     public boolean isDurable()
     {
         return _durable;
@@ -514,12 +526,12 @@ abstract class AbstractQueue<E extends Q
         return _exclusivityPolicy != ExclusivityPolicy.NONE;
     }
 
-    public Exchange getAlternateExchange()
+    public NonDefaultExchange getAlternateExchange()
     {
         return _alternateExchange;
     }
 
-    public void setAlternateExchange(Exchange exchange)
+    public void setAlternateExchange(NonDefaultExchange exchange)
     {
         if(_alternateExchange != null)
         {
@@ -540,9 +552,120 @@ abstract class AbstractQueue<E extends Q
     }
 
     @Override
-    public Object getAttribute(String attrName)
+    public Object getAttribute(String name)
     {
-        return _arguments.get(attrName);
+        if(ALTERNATE_EXCHANGE.equals(name))
+        {
+            return getAlternateExchange();
+        }
+        else if(OWNER.equals(name))
+        {
+            return getOwner();
+        }
+        else if(NAME.equals(name))
+        {
+            return getName();
+        }
+        if(ALERT_REPEAT_GAP.equals(name))
+        {
+            return getAlertRepeatGap();
+        }
+        else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
+        {
+            return getAlertThresholdMessageAge();
+        }
+        else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
+        {
+            return getAlertThresholdMessageSize();
+        }
+        else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
+        {
+            return getAlertThresholdQueueDepthBytes();
+        }
+        else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
+        {
+            return getAlertThresholdQueueDepthMessages();
+        }
+        else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
+        {
+            //We only return the boolean value if message groups are actually in use
+            return _arguments.get(MESSAGE_GROUP_KEY) == null ? null : _arguments.get(MESSAGE_GROUP_SHARED_GROUPS);
+        }
+        else if(LVQ_KEY.equals(name))
+        {
+            if(this instanceof ConflationQueue)
+            {
+                return ((ConflationQueue)this).getConflationKey();
+            }
+        }
+        else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
+        {
+            return getMaximumDeliveryAttempts();
+        }
+        else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
+        {
+            return getQueueFlowControlSizeBytes();
+        }
+        else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
+        {
+            return getQueueFlowResumeSizeBytes();
+        }
+        else if(QUEUE_FLOW_STOPPED.equals(name))
+        {
+            return isOverfull();
+        }
+        else if(SORT_KEY.equals(name))
+        {
+            if(this instanceof SortedQueue)
+            {
+                return ((SortedQueue)this).getSortedPropertyName();
+            }
+        }
+        else if(QUEUE_TYPE.equals(name))
+        {
+            if(this instanceof SortedQueue)
+            {
+                return "sorted";
+            }
+            if(this instanceof ConflationQueue)
+            {
+                return "lvq";
+            }
+            if(this instanceof PriorityQueue)
+            {
+                return "priority";
+            }
+            return "standard";
+        }
+        else if(DURABLE.equals(name))
+        {
+            return isDurable();
+        }
+        else if(ID.equals(name))
+        {
+            return getId();
+        }
+        else if(LIFETIME_POLICY.equals(name))
+        {
+            return getLifetimePolicy();
+        }
+        else if(STATE.equals(name))
+        {
+            return State.ACTIVE; // TODO
+        }
+        else if (DESCRIPTION.equals(name))
+        {
+            return getDescription();
+        }
+        else if(PRIORITIES.equals(name))
+        {
+            if(this instanceof PriorityQueue)
+            {
+                return ((PriorityQueue)this).getPriorities();
+            }
+        }
+
+        return _arguments.get(name);
     }
 
     @Override
@@ -580,7 +703,7 @@ abstract class AbstractQueue<E extends Q
 
 
     @Override
-    public synchronized <T extends ConsumerTarget> QueueConsumer<T,E,Q,L> addConsumer(final T target,
+    public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target,
                                      final FilterManager filters,
                                      final Class<? extends ServerMessage> messageClass,
                                      final String consumerName,
@@ -678,11 +801,12 @@ abstract class AbstractQueue<E extends Q
             throw new ExistingConsumerPreventsExclusive();
         }
 
-        QueueConsumerImpl<T,E,Q,L> consumer = new QueueConsumerImpl<T,E,Q,L>((Q)this,
-                                                                     target,
-                                                                     consumerName,
-                                                                     filters, messageClass,
-                                                                     optionSet);
+        QueueConsumerImpl consumer = new QueueConsumerImpl(this,
+                                                           target,
+                                                           consumerName,
+                                                           filters, 
+                                                           messageClass,
+                                                           optionSet);
 
         _exclusiveOwner = exclusiveOwner;
         target.consumerAdded(consumer);
@@ -699,15 +823,15 @@ abstract class AbstractQueue<E extends Q
         }
 
         consumer.setStateListener(this);
-        consumer.setQueueContext(new QueueContext<E,Q,L>(_entries.getHead()));
+        consumer.setQueueContext(new QueueContext(_entries.getHead()));
 
         if (!isDeleted())
         {
             synchronized (_consumerListeners)
             {
-                for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
+                for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
                 {
-                    listener.consumerAdded((Q)this, consumer);
+                    listener.consumerAdded(this, consumer);
                 }
             }
 
@@ -729,7 +853,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    synchronized void unregisterConsumer(final QueueConsumerImpl<?,E,Q,L> consumer)
+    synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
     {
         if (consumer == null)
         {
@@ -758,9 +882,9 @@ abstract class AbstractQueue<E extends Q
 
             synchronized (_consumerListeners)
             {
-                for(ConsumerRegistrationListener<Q> listener : _consumerListeners)
+                for(ConsumerRegistrationListener<? super MessageSource> listener : _consumerListeners)
                 {
-                    listener.consumerRemoved((Q)this, consumer);
+                    listener.consumerRemoved(this, consumer);
                 }
             }
 
@@ -787,10 +911,10 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public Collection<QueueConsumer<?,E,Q,L>> getConsumers()
+    public Collection<QueueConsumer<?>> getConsumers()
     {
-        List<QueueConsumer<?,E,Q,L>> consumers = new ArrayList<QueueConsumer<?,E,Q,L>>();
-        QueueConsumerList.ConsumerNodeIterator<E,Q,L> iter = _consumerList.iterator();
+        List<QueueConsumer<?>> consumers = new ArrayList<QueueConsumer<?>>();
+        QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
         while(iter.advance())
         {
             consumers.add(iter.getNode().getConsumer());
@@ -799,7 +923,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
+    public void addConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
     {
         synchronized (_consumerListeners)
         {
@@ -807,7 +931,7 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<Q> listener)
+    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener<? super MessageSource> listener)
     {
         synchronized (_consumerListeners)
         {
@@ -815,9 +939,9 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments)
+    public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments)
     {
-        E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
+        QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
         if(clearAssignments)
         {
             _messageGroupManager.clearAssignments(consumer);
@@ -825,11 +949,11 @@ abstract class AbstractQueue<E extends Q
 
         if(entry != null)
         {
-            QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
+            QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
             // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
             while (subscriberIter.advance())
             {
-                QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
+                QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
 
                 // we don't make browsers send the same stuff twice
                 if (sub.seesRequeues())
@@ -843,7 +967,7 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public void addBinding(final Binding binding)
+    public void addBinding(final BindingImpl binding)
     {
         _bindings.add(binding);
         int bindingCount = _bindings.size();
@@ -857,12 +981,12 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public void removeBinding(final Binding binding)
+    public void removeBinding(final BindingImpl binding)
     {
         _bindings.remove(binding);
     }
 
-    public List<Binding> getBindings()
+    public Collection<BindingImpl> getBindings()
     {
         return Collections.unmodifiableList(_bindings);
     }
@@ -879,7 +1003,7 @@ abstract class AbstractQueue<E extends Q
 
     // ------ Enqueue / Dequeue
 
-    public void enqueue(ServerMessage message, Action<? super MessageInstance<?, QueueConsumer<?,E,Q,L>>> action)
+    public void enqueue(ServerMessage message, Action<? super MessageInstance> action)
     {
         incrementQueueCount();
         incrementQueueSize(message);
@@ -887,8 +1011,8 @@ abstract class AbstractQueue<E extends Q
         _totalMessagesReceived.incrementAndGet();
 
 
-        E entry;
-        final QueueConsumer<?,E,Q,L> exclusiveSub = _exclusiveSubscriber;
+        QueueEntry entry;
+        final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
         entry = _entries.add(message);
 
         if(action != null || (exclusiveSub == null  && _queueRunner.isIdle()))
@@ -898,8 +1022,8 @@ abstract class AbstractQueue<E extends Q
             iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
 
              */
-            QueueConsumerList.ConsumerNode<E,Q,L> node = _consumerList.getMarkedNode();
-            QueueConsumerList.ConsumerNode<E,Q,L> nextNode = node.findNext();
+            QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+            QueueConsumerList.ConsumerNode nextNode = node.findNext();
             if (nextNode == null)
             {
                 nextNode = _consumerList.getHead().findNext();
@@ -935,7 +1059,7 @@ abstract class AbstractQueue<E extends Q
                 else
                 {
                     // if consumer at end, and active, offer
-                    QueueConsumer<?,E,Q,L> sub = nextNode.getConsumer();
+                    QueueConsumer<?> sub = nextNode.getConsumer();
                     deliverToConsumer(sub, entry);
                 }
                 nextNode = nextNode.findNext();
@@ -967,7 +1091,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    private void deliverToConsumer(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry)
     {
 
         if(sub.trySendLock())
@@ -998,7 +1122,7 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    private boolean assign(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    private boolean assign(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null)
         {
@@ -1012,17 +1136,17 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    private boolean mightAssign(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
     {
         if(_messageGroupManager == null || !sub.acquires())
         {
             return true;
         }
-        QueueConsumer<?,E,Q,L> assigned = _messageGroupManager.getAssignedConsumer(entry);
+        QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
         return (assigned == null) || (assigned == sub);
     }
 
-    protected void checkConsumersNotAheadOfDelivery(final E entry)
+    protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
     {
         // This method is only required for queues which mess with ordering
         // Simple Queues don't :-)
@@ -1041,12 +1165,12 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public long getTotalDequeueCount()
+    public long getTotalDequeuedMessages()
     {
         return _dequeueCount.get();
     }
 
-    public long getTotalEnqueueCount()
+    public long getTotalEnqueuedMessages()
     {
         return _enqueueCount.get();
     }
@@ -1056,7 +1180,7 @@ abstract class AbstractQueue<E extends Q
         getAtomicQueueCount().incrementAndGet();
     }
 
-    private void deliverMessage(final QueueConsumer<?,E,Q,L> sub, final E entry, boolean batch)
+    private void deliverMessage(final QueueConsumer<?> sub, final QueueEntry entry, boolean batch)
     {
         setLastSeenEntry(sub, entry);
 
@@ -1066,18 +1190,18 @@ abstract class AbstractQueue<E extends Q
         sub.send(entry, batch);
     }
 
-    private boolean consumerReadyAndHasInterest(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    private boolean consumerReadyAndHasInterest(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
     }
 
 
-    private void setLastSeenEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry entry)
     {
-        QueueContext<E,Q,L> subContext = sub.getQueueContext();
+        QueueContext subContext = sub.getQueueContext();
         if (subContext != null)
         {
-            E releasedEntry = subContext.getReleasedEntry();
+            QueueEntry releasedEntry = subContext.getReleasedEntry();
 
             QueueContext._lastSeenUpdater.set(subContext, entry);
             if(releasedEntry == entry)
@@ -1087,13 +1211,13 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    private void updateSubRequeueEntry(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    private void updateSubRequeueEntry(final QueueConsumer<?> sub, final QueueEntry entry)
     {
 
-        QueueContext<E,Q,L> subContext = sub.getQueueContext();
+        QueueContext subContext = sub.getQueueContext();
         if(subContext != null)
         {
-            E oldEntry;
+            QueueEntry oldEntry;
 
             while((oldEntry  = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0)
             {
@@ -1105,13 +1229,13 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public void requeue(E entry)
+    public void requeue(QueueEntry entry)
     {
-        QueueConsumerList.ConsumerNodeIterator<E,Q,L> subscriberIter = _consumerList.iterator();
+        QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
         while (subscriberIter.advance() && entry.isAvailable())
         {
-            QueueConsumer<?,E,Q,L> sub = subscriberIter.getNode().getConsumer();
+            QueueConsumer<?> sub = subscriberIter.getNode().getConsumer();
 
             // we don't make browsers send the same stuff twice
             if (sub.seesRequeues())
@@ -1125,7 +1249,7 @@ abstract class AbstractQueue<E extends Q
     }
 
     @Override
-    public void dequeue(E entry)
+    public void dequeue(QueueEntry entry)
     {
         decrementQueueCount();
         decrementQueueSize(entry);
@@ -1138,7 +1262,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    private void decrementQueueSize(final E entry)
+    private void decrementQueueSize(final QueueEntry entry)
     {
         final ServerMessage message = entry.getMessage();
         long size = message.getSize();
@@ -1157,7 +1281,7 @@ abstract class AbstractQueue<E extends Q
         _dequeueCount.incrementAndGet();
     }
 
-    public boolean resend(final E entry, final QueueConsumer<?,E,Q,L> consumer)
+    public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
     {
         /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
                   entry to resend and move back the consumer pointer. */
@@ -1188,7 +1312,7 @@ abstract class AbstractQueue<E extends Q
         return _consumerList.size();
     }
 
-    public int getActiveConsumerCount()
+    public int getConsumerCountWithCredit()
     {
         return _activeSubscriberCount.get();
     }
@@ -1200,22 +1324,23 @@ abstract class AbstractQueue<E extends Q
 
     public boolean isEmpty()
     {
-        return getMessageCount() == 0;
+        return getQueueDepthMessages() == 0;
     }
 
-    public int getMessageCount()
+    @Override
+    public int getQueueDepthMessages()
     {
         return getAtomicQueueCount().get();
     }
 
-    public long getQueueDepth()
+    public long getQueueDepthBytes()
     {
         return getAtomicQueueSize().get();
     }
 
     public int getUndeliveredMessageCount()
     {
-        int count = getMessageCount() - _deliveredMessages.get();
+        int count = getQueueDepthMessages() - _deliveredMessages.get();
         if (count < 0)
         {
             return 0;
@@ -1233,11 +1358,11 @@ abstract class AbstractQueue<E extends Q
 
     public long getOldestMessageArrivalTime()
     {
-        E entry = getOldestQueueEntry();
+        QueueEntry entry = getOldestQueueEntry();
         return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime();
     }
 
-    protected E getOldestQueueEntry()
+    protected QueueEntry getOldestQueueEntry()
     {
         return _entries.next(_entries.getHead());
     }
@@ -1247,13 +1372,13 @@ abstract class AbstractQueue<E extends Q
         return _deleted.get();
     }
 
-    public List<E> getMessagesOnTheQueue()
+    public List<QueueEntry> getMessagesOnTheQueue()
     {
-        ArrayList<E> entryList = new ArrayList<E>();
-        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+        QueueEntryIterator queueListIterator = _entries.iterator();
         while (queueListIterator.advance())
         {
-            E node = queueListIterator.getNode();
+            QueueEntry node = queueListIterator.getNode();
             if (node != null && !node.isDeleted())
             {
                 entryList.add(node);
@@ -1263,16 +1388,16 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public void stateChanged(QueueConsumer<?,E,Q,L> sub, QueueConsumer.State oldState, QueueConsumer.State newState)
+    public void stateChanged(QueueConsumer<?> sub, State oldState, State newState)
     {
-        if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
+        if (oldState == State.ACTIVE && newState != State.ACTIVE)
         {
             _activeSubscriberCount.decrementAndGet();
 
         }
-        else if (newState == QueueConsumer.State.ACTIVE)
+        else if (newState == State.ACTIVE)
         {
-            if (oldState != QueueConsumer.State.ACTIVE)
+            if (oldState != State.ACTIVE)
             {
                 _activeSubscriberCount.incrementAndGet();
 
@@ -1281,7 +1406,7 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public int compareTo(final Q o)
+    public int compareTo(final AMQQueue o)
     {
         return _name.compareTo(o.getName());
     }
@@ -1301,7 +1426,7 @@ abstract class AbstractQueue<E extends Q
         return _exclusiveSubscriber != null;
     }
 
-    private void setExclusiveSubscriber(QueueConsumer<?,E,Q,L> exclusiveSubscriber)
+    private void setExclusiveSubscriber(QueueConsumer<?> exclusiveSubscriber)
     {
         _exclusiveSubscriber = exclusiveSubscriber;
     }
@@ -1312,32 +1437,32 @@ abstract class AbstractQueue<E extends Q
     }
 
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
-    protected L getEntries()
+    protected QueueEntryList getEntries()
     {
         return _entries;
     }
 
-    protected QueueConsumerList<E,Q,L> getConsumerList()
+    protected QueueConsumerList getConsumerList()
     {
         return _consumerList;
     }
 
 
-    public static interface QueueEntryFilter<E extends QueueEntry>
+    public static interface QueueEntryFilter
     {
-        public boolean accept(E entry);
+        public boolean accept(QueueEntry entry);
 
         public boolean filterComplete();
     }
 
 
 
-    public List<E> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
+    public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
     {
-        return getMessagesOnTheQueue(new QueueEntryFilter<E>()
+        return getMessagesOnTheQueue(new QueueEntryFilter()
         {
 
-            public boolean accept(E entry)
+            public boolean accept(QueueEntry entry)
             {
                 final long messageId = entry.getMessage().getMessageNumber();
                 return messageId >= fromMessageId && messageId <= toMessageId;
@@ -1350,13 +1475,13 @@ abstract class AbstractQueue<E extends Q
         });
     }
 
-    public E getMessageOnTheQueue(final long messageId)
+    public QueueEntry getMessageOnTheQueue(final long messageId)
     {
-        List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
+        List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
             private boolean _complete;
 
-            public boolean accept(E entry)
+            public boolean accept(QueueEntry entry)
             {
                 _complete = entry.getMessage().getMessageNumber() == messageId;
                 return _complete;
@@ -1370,13 +1495,13 @@ abstract class AbstractQueue<E extends Q
         return entries.isEmpty() ? null : entries.get(0);
     }
 
-    public List<E> getMessagesOnTheQueue(QueueEntryFilter<E> filter)
+    public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
     {
-        ArrayList<E> entryList = new ArrayList<E>();
-        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+        ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
+        QueueEntryIterator queueListIterator = _entries.iterator();
         while (queueListIterator.advance() && !filter.filterComplete())
         {
-            E node = queueListIterator.getNode();
+            QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && filter.accept(node))
             {
                 entryList.add(node);
@@ -1386,13 +1511,13 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public void visit(final QueueEntryVisitor<E> visitor)
+    public void visit(final QueueEntryVisitor visitor)
     {
-        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+        QueueEntryIterator queueListIterator = _entries.iterator();
 
         while(queueListIterator.advance())
         {
-            E node = queueListIterator.getNode();
+            QueueEntry node = queueListIterator.getNode();
 
             if(!node.isDeleted())
             {
@@ -1413,13 +1538,13 @@ abstract class AbstractQueue<E extends Q
      * @param toPosition last message position
      * @return list of messages
      */
-    public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
+    public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
     {
-        return getMessagesOnTheQueue(new QueueEntryFilter<E>()
+        return getMessagesOnTheQueue(new QueueEntryFilter()
                                         {
                                             private long position = 0;
 
-                                            public boolean accept(E entry)
+                                            public boolean accept(QueueEntry entry)
                                             {
                                                 position++;
                                                 return (position >= fromPosition) && (position <= toPosition);
@@ -1450,14 +1575,14 @@ abstract class AbstractQueue<E extends Q
         //Perform ACLs
         getVirtualHost().getSecurityManager().authorisePurge(this);
 
-        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+        QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
         ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
         while (queueListIterator.advance())
         {
-            E node = queueListIterator.getNode();
+            QueueEntry node = queueListIterator.getNode();
             if (node.acquire())
             {
                 dequeueEntry(node, txn);
@@ -1474,13 +1599,13 @@ abstract class AbstractQueue<E extends Q
         return count;
     }
 
-    private void dequeueEntry(final E node)
+    private void dequeueEntry(final QueueEntry node)
     {
         ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore());
         dequeueEntry(node, txn);
     }
 
-    private void dequeueEntry(final E node, ServerTransaction txn)
+    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
     {
         txn.dequeue(this, node.getMessage(),
                     new ServerTransaction.Action()
@@ -1499,13 +1624,13 @@ abstract class AbstractQueue<E extends Q
     }
 
     @Override
-    public void addDeleteTask(final Action<? super Q> task)
+    public void addDeleteTask(final Action<? super AMQQueue> task)
     {
         _deleteTaskList.add(task);
     }
 
     @Override
-    public void removeDeleteTask(final Action<? super Q> task)
+    public void removeDeleteTask(final Action<? super AMQQueue> task)
     {
         _deleteTaskList.remove(task);
     }
@@ -1519,9 +1644,9 @@ abstract class AbstractQueue<E extends Q
         if (!_deleted.getAndSet(true))
         {
 
-            final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+            final ArrayList<BindingImpl> bindingCopy = new ArrayList<BindingImpl>(_bindings);
 
-            for (Binding b : bindingCopy)
+            for (BindingImpl b : bindingCopy)
             {
                 b.delete();
             }
@@ -1538,10 +1663,10 @@ abstract class AbstractQueue<E extends Q
             }
 
 
-            List<E> entries = getMessagesOnTheQueue(new QueueEntryFilter<E>()
+            List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
             {
 
-                public boolean accept(E entry)
+                public boolean accept(QueueEntry entry)
                 {
                     return entry.acquire();
                 }
@@ -1555,7 +1680,7 @@ abstract class AbstractQueue<E extends Q
             ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
 
 
-            for(final E entry : entries)
+            for(final QueueEntry entry : entries)
             {
                 // TODO log requeues with a post enqueue action
                 int requeues = entry.routeToAlternate(null, txn);
@@ -1574,9 +1699,9 @@ abstract class AbstractQueue<E extends Q
             }
 
 
-            for (Action<? super Q> task : _deleteTaskList)
+            for (Action<? super AMQQueue> task : _deleteTaskList)
             {
-                task.performAction((Q)this);
+                task.performAction(this);
             }
 
             _deleteTaskList.clear();
@@ -1586,7 +1711,7 @@ abstract class AbstractQueue<E extends Q
             CurrentActor.get().message(_logSubject, QueueMessages.DELETED());
 
         }
-        return getMessageCount();
+        return getQueueDepthMessages();
 
     }
 
@@ -1660,7 +1785,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public void deliverAsync(QueueConsumer<?,E,Q,L> sub)
+    public void deliverAsync(QueueConsumer<?> sub)
     {
         if(_exclusiveSubscriber == null)
         {
@@ -1674,13 +1799,13 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    void flushConsumer(QueueConsumer<?,E,Q,L> sub)
+    void flushConsumer(QueueConsumer<?> sub)
     {
 
         flushConsumer(sub, Long.MAX_VALUE);
     }
 
-    boolean flushConsumer(QueueConsumer<?,E,Q,L> sub, long iterations)
+    boolean flushConsumer(QueueConsumer<?> sub, long iterations)
     {
         boolean atTail = false;
         final boolean keepSendLockHeld = iterations <=  AbstractQueue.MAX_ASYNC_DELIVERIES;
@@ -1757,7 +1882,7 @@ abstract class AbstractQueue<E extends Q
      * @param batch true if processing can be batched
      * @return true if we have completed all possible deliveries for this sub.
      */
-    private boolean attemptDelivery(QueueConsumer<?,E,Q,L> sub, boolean batch)
+    private boolean attemptDelivery(QueueConsumer<?> sub, boolean batch)
     {
         boolean atTail = false;
 
@@ -1765,7 +1890,7 @@ abstract class AbstractQueue<E extends Q
         if (subActive)
         {
 
-            E node  = getNextAvailableEntry(sub);
+            QueueEntry node  = getNextAvailableEntry(sub);
 
             if (node != null && node.isAvailable())
             {
@@ -1802,11 +1927,11 @@ abstract class AbstractQueue<E extends Q
 
     protected void advanceAllConsumers()
     {
-        QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
+        QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
         while (consumerNodeIterator.advance())
         {
-            QueueConsumerList.ConsumerNode<E,Q,L> subNode = consumerNodeIterator.getNode();
-            QueueConsumer<?,E,Q,L> sub = subNode.getConsumer();
+            QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
+            QueueConsumer sub = subNode.getConsumer();
             if(sub.acquires())
             {
                 getNextAvailableEntry(sub);
@@ -1818,15 +1943,15 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    private E getNextAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
+    private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
     {
-        QueueContext<E,Q,L> context = sub.getQueueContext();
+        QueueContext context = sub.getQueueContext();
         if(context != null)
         {
-            E lastSeen = context.getLastSeenEntry();
-            E releasedNode = context.getReleasedEntry();
+            QueueEntry lastSeen = context.getLastSeenEntry();
+            QueueEntry releasedNode = context.getReleasedEntry();
 
-            E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+            QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
             while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
@@ -1858,12 +1983,12 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub)
+    public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub)
     {
-        QueueContext<E,Q,L> context = sub.getQueueContext();
+        QueueContext context = sub.getQueueContext();
         if(context != null)
         {
-            E releasedNode = context.getReleasedEntry();
+            QueueEntry releasedNode = context.getReleasedEntry();
             return releasedNode != null && releasedNode.compareTo(entry) < 0;
         }
         else
@@ -1934,11 +2059,11 @@ abstract class AbstractQueue<E extends Q
             boolean allConsumersDone = true;
             boolean consumerDone;
 
-            QueueConsumerList.ConsumerNodeIterator<E,Q,L> consumerNodeIterator = _consumerList.iterator();
+            QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
             //iterate over the subscribers and try to advance their pointer
             while (consumerNodeIterator.advance())
             {
-                QueueConsumer<?,E,Q,L> sub = consumerNodeIterator.getNode().getConsumer();
+                QueueConsumer<?> sub = consumerNodeIterator.getNode().getConsumer();
                 sub.getSendLock();
 
                     try
@@ -2020,11 +2145,11 @@ abstract class AbstractQueue<E extends Q
 
     public void checkMessageStatus()
     {
-        QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> queueListIterator = _entries.iterator();
+        QueueEntryIterator queueListIterator = _entries.iterator();
 
         while (queueListIterator.advance())
         {
-            E node = queueListIterator.getNode();
+            QueueEntry node = queueListIterator.getNode();
             // Only process nodes that are not currently deleted and not dequeued
             if (!node.isDeleted())
             {
@@ -2054,7 +2179,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public long getMinimumAlertRepeatGap()
+    public long getAlertRepeatGap()
     {
         return _minimumAlertRepeatGap;
     }
@@ -2064,7 +2189,7 @@ abstract class AbstractQueue<E extends Q
         _minimumAlertRepeatGap = minimumAlertRepeatGap;
     }
 
-    public long getMaximumMessageAge()
+    public long getAlertThresholdMessageAge()
     {
         return _maximumMessageAge;
     }
@@ -2082,7 +2207,7 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public long getMaximumMessageCount()
+    public long getAlertThresholdQueueDepthMessages()
     {
         return _maximumMessageCount;
     }
@@ -2101,7 +2226,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public long getMaximumQueueDepth()
+    public long getAlertThresholdQueueDepthBytes()
     {
         return _maximumQueueDepth;
     }
@@ -2121,7 +2246,7 @@ abstract class AbstractQueue<E extends Q
 
     }
 
-    public long getMaximumMessageSize()
+    public long getAlertThresholdMessageSize()
     {
         return _maximumMessageSize;
     }
@@ -2139,7 +2264,7 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public long getCapacity()
+    public long getQueueFlowControlSizeBytes()
     {
         return _capacity;
     }
@@ -2149,7 +2274,7 @@ abstract class AbstractQueue<E extends Q
         _capacity = capacity;
     }
 
-    public long getFlowResumeCapacity()
+    public long getQueueFlowResumeSizeBytes()
     {
         return _flowResumeCapacity;
     }
@@ -2191,12 +2316,12 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    private final class QueueEntryListener implements StateChangeListener<E, QueueEntry.State>
+    private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
     {
 
-        private final QueueConsumer<?,E,Q,L> _sub;
+        private final QueueConsumer<?> _sub;
 
-        public QueueEntryListener(final QueueConsumer<?,E,Q,L> sub)
+        public QueueEntryListener(final QueueConsumer<?> sub)
         {
             _sub = sub;
         }
@@ -2212,7 +2337,7 @@ abstract class AbstractQueue<E extends Q
             return System.identityHashCode(_sub);
         }
 
-        public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState)
+        public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
         {
             entry.removeStateChangeListener(this);
             deliverAsync(_sub);
@@ -2241,32 +2366,32 @@ abstract class AbstractQueue<E extends Q
         return ids;
     }
 
-    public long getTotalEnqueueSize()
+    public long getTotalEnqueuedBytes()
     {
         return _enqueueSize.get();
     }
 
-    public long getTotalDequeueSize()
+    public long getTotalDequeuedBytes()
     {
         return _dequeueSize.get();
     }
 
-    public long getPersistentByteEnqueues()
+    public long getPersistentEnqueuedBytes()
     {
         return _persistentMessageEnqueueSize.get();
     }
 
-    public long getPersistentByteDequeues()
+    public long getPersistentDequeuedBytes()
     {
         return _persistentMessageDequeueSize.get();
     }
 
-    public long getPersistentMsgEnqueues()
+    public long getPersistentEnqueuedMessages()
     {
         return _persistentMessageEnqueueCount.get();
     }
 
-    public long getPersistentMsgDequeues()
+    public long getPersistentDequeuedMessages()
     {
         return _persistentMessageDequeueCount.get();
     }
@@ -2278,23 +2403,23 @@ abstract class AbstractQueue<E extends Q
         return getName();
     }
 
-    public long getUnackedMessageCount()
+    public long getUnacknowledgedMessages()
     {
         return _unackedMsgCount.get();
     }
 
-    public long getUnackedMessageBytes()
+    public long getUnacknowledgedBytes()
     {
         return _unackedMsgBytes.get();
     }
 
-    public void decrementUnackedMsgCount(E queueEntry)
+    public void decrementUnackedMsgCount(QueueEntry queueEntry)
     {
         _unackedMsgCount.decrementAndGet();
         _unackedMsgBytes.addAndGet(-queueEntry.getSize());
     }
 
-    private void incrementUnackedMsgCount(E entry)
+    private void incrementUnackedMsgCount(QueueEntry entry)
     {
         _unackedMsgCount.incrementAndGet();
         _unackedMsgBytes.addAndGet(entry.getSize());
@@ -2305,7 +2430,8 @@ abstract class AbstractQueue<E extends Q
         return _logActor;
     }
 
-    public int getMaximumDeliveryCount()
+    @Override
+    public int getMaximumDeliveryAttempts()
     {
         return _maximumDeliveryCount;
     }
@@ -2321,12 +2447,15 @@ abstract class AbstractQueue<E extends Q
     private void checkForNotification(ServerMessage<?> msg)
     {
         final Set<NotificationCheck> notificationChecks = getNotificationChecks();
-        final AMQQueue.NotificationListener listener = _notificationListener;
-
+        QueueNotificationListener  listener = _notificationListener;
+        if(listener == null)
+        {
+            listener = NULL_NOTIFICATION_LISTENER;
+        }
         if(listener != null && !notificationChecks.isEmpty())
         {
             final long currentTime = System.currentTimeMillis();
-            final long thresholdTime = currentTime - getMinimumAlertRepeatGap();
+            final long thresholdTime = currentTime - getAlertRepeatGap();
 
             for (NotificationCheck check : notificationChecks)
             {
@@ -2341,7 +2470,7 @@ abstract class AbstractQueue<E extends Q
         }
     }
 
-    public void setNotificationListener(AMQQueue.NotificationListener listener)
+    public void setNotificationListener(QueueNotificationListener  listener)
     {
         _notificationListener = listener;
     }
@@ -2361,7 +2490,7 @@ abstract class AbstractQueue<E extends Q
     public final  <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
                               final InstanceProperties instanceProperties,
                               final ServerTransaction txn,
-                              final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+                              final Action<? super MessageInstance> postEnqueueAction)
     {
             txn.enqueue(this,message, new ServerTransaction.Action()
             {
@@ -2612,4 +2741,371 @@ abstract class AbstractQueue<E extends Q
             _deleteTask = deleteTask;
         }
     }
+
+    //=============
+
+
+    @Override
+    protected boolean setState(final State currentState, final State desiredState)
+    {
+        if(desiredState == State.DELETED)
+        {
+            _virtualHost.removeQueue(this);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String getQueueType()
+    {
+        return null;
+    }
+
+    @Override
+    public ExclusivityPolicy getExclusive()
+    {
+        return _exclusivityPolicy;
+    }
+
+    @Override
+    public boolean getNoLocal()
+    {
+        return _noLocal;
+    }
+
+    @Override
+    public String getLvqKey()
+    {
+        return null;
+    }
+
+    @Override
+    public String getSortKey()
+    {
+        return null;
+    }
+
+    @Override
+    public String getMessageGroupKey()
+    {
+        return (String) getAttribute(MESSAGE_GROUP_KEY);
+    }
+
+    @Override
+    public int getMessageGroupSharedGroups()
+    {
+        return (Integer) getAttribute(MESSAGE_GROUP_SHARED_GROUPS);
+    }
+
+
+    @Override
+    public boolean isQueueFlowStopped()
+    {
+        return false;
+    }
+
+    @Override
+    public int getPriorities()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getBytesIn()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getBytesOut()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getMessagesIn()
+    {
+        return 0;
+    }
+
+    @Override
+    public long getMessagesOut()
+    {
+        return 0;
+    }
+
+    @Override
+    public String setName(final String currentName, final String desiredName)
+            throws IllegalStateException, AccessControlException
+    {
+        return null;
+    }
+
+    @Override
+    public State getState()
+    {
+        return isDeleted() ? State.DELETED : State.ACTIVE;
+    }
+
+    @Override
+    public void setDurable(final boolean durable)
+            throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+
+    }
+
+    @Override
+    public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+            throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+        return null;
+    }
+
+    @Override
+    public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+    {
+        if(clazz == Binding.class)
+        {
+            return (Collection<C>) getBindings();
+        }
+        else if(clazz == org.apache.qpid.server.model.Consumer.class)
+        {
+            return (Collection<C>) getConsumers();
+        }
+        else return Collections.emptySet();
+    }
+
+    @Override
+    public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
+    {
+        if(clazz == org.apache.qpid.server.model.VirtualHost.class)
+        {
+            return (T) _virtualHost.getModel();
+        }
+        return super.getParent(clazz);
+    }
+
+    @Override
+    protected <C extends ConfiguredObject> C addChild(final Class<C> childClass,
+                                                      final Map<String, Object> attributes,
+                                                      final ConfiguredObject... otherParents)
+    {
+        if(childClass == Binding.class && otherParents.length == 1 && otherParents[0] instanceof Exchange)
+        {
+            final String bindingKey = (String) attributes.get("name");
+            ((NonDefaultExchange)otherParents[0]).addBinding(bindingKey, this, attributes);
+            for(Binding binding : _bindings)
+            {
+                if(binding.getExchange() == otherParents[0] && binding.getName().equals(bindingKey))
+                {
+                    return (C) binding;
+                }
+            }
+            return null;
+        }
+        return super.addChild(childClass, attributes, otherParents);
+    }
+
+    @Override
+    public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+        try
+        {
+            if(ALERT_REPEAT_GAP.equals(name))
+            {
+                setMinimumAlertRepeatGap((Long) desired);
+                return true;
+            }
+            else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
+            {
+                setMaximumMessageAge((Long) desired);
+                return true;
+            }
+            else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
+            {
+                setMaximumMessageSize((Long) desired);
+                return true;
+            }
+            else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
+            {
+                setMaximumQueueDepth((Long) desired);
+                return true;
+            }
+            else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
+            {
+                setMaximumMessageCount((Long) desired);
+                return true;
+            }
+            else if(ALTERNATE_EXCHANGE.equals(name))
+            {
+                // In future we may want to accept a UUID as an alternative way to identifying the exchange
+                NonDefaultExchange alternateExchange = (NonDefaultExchange) desired;
+                setAlternateExchange(alternateExchange);
+                return true;
+            }
+            else if(EXCLUSIVE.equals(name))
+            {
+                ExclusivityPolicy desiredPolicy;
+                if(desired == null)
+                {
+                    desiredPolicy = ExclusivityPolicy.NONE;
+                }
+                else if(desired instanceof  ExclusivityPolicy)
+                {
+                    desiredPolicy = (ExclusivityPolicy)desired;
+                }
+                else if (desired instanceof String)
+                {
+                    desiredPolicy = ExclusivityPolicy.valueOf((String)desired);
+                }
+                else
+                {
+                    throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName());
+                }
+                try
+                {
+                    setExclusivityPolicy(desiredPolicy);
+                }
+                catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive)
+                {
+                    throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this");
+                }
+                return true;
+
+            }
+            else if(MESSAGE_GROUP_KEY.equals(name))
+            {
+                // TODO
+            }
+            else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
+            {
+                // TODO
+            }
+            else if(LVQ_KEY.equals(name))
+            {
+                // TODO
+            }
+            else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
+            {
+                setMaximumDeliveryCount((Integer) desired);
+                return true;
+            }
+            else if(NO_LOCAL.equals(name))
+            {
+                // TODO
+            }
+            else if(OWNER.equals(name))
+            {
+                // TODO
+            }
+            else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
+            {
+                setCapacity((Long) desired);
+                return true;
+            }
+            else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
+            {
+                setFlowResumeCapacity((Long) desired);
+                return true;
+            }
+            else if(QUEUE_FLOW_STOPPED.equals(name))
+            {
+                // TODO
+            }
+            else if(SORT_KEY.equals(name))
+            {
+                // TODO
+            }
+            else if(QUEUE_TYPE.equals(name))
+            {
+                // TODO
+            }
+            else if (DESCRIPTION.equals(name))
+            {
+                setDescription((String) desired);
+                return true;
+            }
+
+            return super.changeAttribute(name, expected, desired);
+        }
+        finally
+        {
+            if (isDurable())
+            {
+                DurableConfigurationStoreHelper.updateQueue(this.getVirtualHost().getDurableConfigurationStore(),
+                                                            this);
+            }
+        }
+    }
+
+    @Override
+    public Collection<String> getAttributeNames()
+    {
+        return getAttributeNames(getClass());
+    }
+    @Override
+    protected void authoriseSetAttribute(String name, Object expected, Object desired) throws AccessControlException
+    {
+        _virtualHost.getSecurityManager().authoriseUpdate(this);
+    }
+
+    @Override
+    protected void authoriseSetAttributes(Map<String, Object> attributes) throws AccessControlException
+    {
+        _virtualHost.getSecurityManager().authoriseUpdate(this);
+    }
+
+    @SuppressWarnings("serial")
+    static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
+        put(ALERT_REPEAT_GAP, Long.class);
+        put(ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
+        put(ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
+        put(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class);
+        put(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class);
+        put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
+        put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class);
+        put(MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
+        put(DESCRIPTION, String.class);
+    }});
+
+    @Override
+    protected void changeAttributes(final Map<String, Object> attributes)
+    {
+        Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+        validateAttributes(convertedAttributes);
+
+        super.changeAttributes(convertedAttributes);
+    }
+
+    private void validateAttributes(Map<String, Object> convertedAttributes)
+    {
+        Long queueFlowControlSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_SIZE_BYTES);
+        Long queueFlowControlResumeSize = (Long) convertedAttributes.get(QUEUE_FLOW_RESUME_SIZE_BYTES);
+        if (queueFlowControlSize != null || queueFlowControlResumeSize != null )
+        {
+            if (queueFlowControlSize == null)
+            {
+                queueFlowControlSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES);
+            }
+            if (queueFlowControlResumeSize == null)
+            {
+                queueFlowControlResumeSize = (Long)getAttribute(QUEUE_FLOW_RESUME_SIZE_BYTES);
+            }
+            if (queueFlowControlResumeSize > queueFlowControlSize)
+            {
+                throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
+            }
+        }
+        for (Map.Entry<String, Object> entry: convertedAttributes.entrySet())
+        {
+            Object value = entry.getValue();
+            if (value instanceof Number && ((Number)value).longValue() < 0)
+            {
+                throw new IllegalConfigurationException("Only positive integer value can be specified for the attribute "
+                                                        + entry.getKey());
+            }
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java Wed Feb 26 23:27:39 2014
@@ -28,13 +28,13 @@ import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 
 
-public class AssignedConsumerMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements MessageGroupManager<E,Q,L>
+public class AssignedConsumerMessageGroupManager implements MessageGroupManager
 {
     private static final Logger _logger = LoggerFactory.getLogger(AssignedConsumerMessageGroupManager.class);
 
 
     private final String _groupId;
-    private final ConcurrentHashMap<Integer, QueueConsumer> _groupMap = new ConcurrentHashMap<Integer, QueueConsumer>();
+    private final ConcurrentHashMap<Integer, QueueConsumer<?>> _groupMap = new ConcurrentHashMap<Integer, QueueConsumer<?>>();
     private final int _groupMask;
 
     public AssignedConsumerMessageGroupManager(final String groupId, final int maxGroups)
@@ -53,18 +53,18 @@ public class AssignedConsumerMessageGrou
         return val;
     }
 
-    public QueueConsumer getAssignedConsumer(final E entry)
+    public QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
     {
         Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
         return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
     }
 
-    public boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry)
+    public boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry)
     {
         return assignMessage(sub, entry) && entry.acquire(sub);
     }
 
-    private boolean assignMessage(QueueConsumer sub, E entry)
+    private boolean assignMessage(QueueConsumer<?> sub, QueueEntry entry)
     {
         Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
         if(groupVal == null)
@@ -98,24 +98,24 @@ public class AssignedConsumerMessageGrou
         }
     }
     
-    public E findEarliestAssignedAvailableEntry(QueueConsumer sub)
+    public QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?> sub)
     {
         EntryFinder visitor = new EntryFinder(sub);
         sub.getQueue().visit(visitor);
         return visitor.getEntry();
     }
 
-    private class EntryFinder implements QueueEntryVisitor<E>
+    private class EntryFinder implements QueueEntryVisitor
     {
-        private E _entry;
-        private QueueConsumer _sub;
+        private QueueEntry _entry;
+        private QueueConsumer<?> _sub;
 
-        public EntryFinder(final QueueConsumer sub)
+        public EntryFinder(final QueueConsumer<?> sub)
         {
             _sub = sub;
         }
 
-        public boolean visit(final E entry)
+        public boolean visit(final QueueEntry entry)
         {
             if(!entry.isAvailable())
             {
@@ -129,7 +129,7 @@ public class AssignedConsumerMessageGrou
             }
 
             Integer group = groupId.hashCode() & _groupMask;
-            Consumer assignedSub = _groupMap.get(group);
+            QueueConsumer<?> assignedSub = _groupMap.get(group);
             if(assignedSub == _sub)
             {
                 _entry = entry;
@@ -141,15 +141,15 @@ public class AssignedConsumerMessageGrou
             }
         }
 
-        public E getEntry()
+        public QueueEntry getEntry()
         {
             return _entry;
         }
     }
 
-    public void clearAssignments(QueueConsumer sub)
+    public void clearAssignments(QueueConsumer<?> sub)
     {
-        Iterator<QueueConsumer> subIter = _groupMap.values().iterator();
+        Iterator<QueueConsumer<?>> subIter = _groupMap.values().iterator();
         while(subIter.hasNext())
         {
             if(subIter.next() == sub)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Feb 26 23:27:39 2014
@@ -27,9 +27,9 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
 
-public interface BaseQueue<C extends Consumer> extends TransactionLogResource
+public interface BaseQueue extends TransactionLogResource
 {
-    void enqueue(ServerMessage message, Action<? super MessageInstance<?,C>> action);
+    void enqueue(ServerMessage message, Action<? super MessageInstance> action);
 
     boolean isDurable();
     boolean isDeleted();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Wed Feb 26 23:27:39 2014
@@ -28,7 +28,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class ConflationQueue extends AbstractQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+public class ConflationQueue extends AbstractQueue
 {
     public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key";
 
@@ -52,7 +52,7 @@ public class ConflationQueue extends Abs
 
     public String getConflationKey()
     {
-        return getEntries().getConflationKey();
+        return ((ConflationQueueList)getEntries()).getConflationKey();
     }
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Feb 26 23:27:39 2014
@@ -32,17 +32,17 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+public class ConflationQueueList extends OrderedQueueEntryList
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
 
-    private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>()
+    private static final HeadCreator HEAD_CREATOR = new HeadCreator()
     {
 
         @Override
-        public ConflationQueueEntry createHead(final ConflationQueueList list)
+        public ConflationQueueEntry createHead(final QueueEntryList list)
         {
-            return list.createHead();
+            return ((ConflationQueueList)list).createHead();
         }
     };
 
@@ -75,13 +75,15 @@ public class ConflationQueueList extends
         return new ConflationQueueEntry(this, message);
     }
 
+
+
     /**
      * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
      */
     @Override
     public ConflationQueueEntry add(final ServerMessage message)
     {
-        final ConflationQueueEntry addedEntry = super.add(message);
+        final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message);
 
         final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
         if (keyValue != null)
@@ -192,7 +194,7 @@ public class ConflationQueueList extends
         }
     }
 
-    final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+    final class ConflationQueueEntry extends OrderedQueueEntry
     {
 
         private AtomicReference<ConflationQueueEntry> _latestValueReference;
@@ -252,7 +254,7 @@ public class ConflationQueueList extends
         return Collections.unmodifiableMap(_latestValuesMap);
     }
 
-    static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
+    static class Factory implements QueueEntryListFactory
     {
         private final String _conflationKey;
 
@@ -262,9 +264,9 @@ public class ConflationQueueList extends
         }
 
         @Override
-        public ConflationQueueList createQueueEntryList(final ConflationQueue queue)
+        public ConflationQueueList createQueueEntryList(final AMQQueue<?> queue)
         {
-            return new ConflationQueueList(queue, _conflationKey);
+            return new ConflationQueueList((ConflationQueue)queue, _conflationKey);
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Wed Feb 26 23:27:39 2014
@@ -32,22 +32,22 @@ import org.apache.qpid.server.message.Se
 import java.util.HashMap;
 import java.util.Map;
 
-public class DefinedGroupMessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements MessageGroupManager<E,Q,L>
+public class DefinedGroupMessageGroupManager implements MessageGroupManager
 {
     private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
 
     private final String _groupId;
     private final String _defaultGroup;
     private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
-    private final ConsumerResetHelper<E,Q,L> _resetHelper;
+    private final ConsumerResetHelper _resetHelper;
 
     private final class Group
     {
         private final Object _group;
-        private QueueConsumer<?,E,Q,L> _consumer;
+        private QueueConsumer<?> _consumer;
         private int _activeCount;
 
-        private Group(final Object key, final QueueConsumer<?,E,Q,L> consumer)
+        private Group(final Object key, final QueueConsumer<?> consumer)
         {
             _group = key;
             _consumer = consumer;
@@ -104,7 +104,7 @@ public class DefinedGroupMessageGroupMan
             return !(_consumer == null || (_activeCount == 0 && _consumer.isClosed()));
         }
 
-        public QueueConsumer<?,E,Q,L> getConsumer()
+        public QueueConsumer<?> getConsumer()
         {
             return _consumer;
         }
@@ -120,14 +120,14 @@ public class DefinedGroupMessageGroupMan
         }
     }
 
-    public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper<E,Q,L> resetHelper)
+    public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, ConsumerResetHelper resetHelper)
     {
         _groupId = groupId;
         _defaultGroup = defaultGroup;
         _resetHelper = resetHelper;
     }
     
-    public synchronized QueueConsumer<?,E,Q,L> getAssignedConsumer(final E entry)
+    public synchronized QueueConsumer<?> getAssignedConsumer(final QueueEntry entry)
     {
         Object groupId = getKey(entry);
 
@@ -135,12 +135,12 @@ public class DefinedGroupMessageGroupMan
         return group == null || !group.isValid() ? null : group.getConsumer();
     }
 
-    public synchronized boolean acceptMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    public synchronized boolean acceptMessage(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         return assignMessage(sub, entry) && entry.acquire(sub);
     }
 
-    private boolean assignMessage(final QueueConsumer<?,E,Q,L> sub, final E entry)
+    private boolean assignMessage(final QueueConsumer<?> sub, final QueueEntry entry)
     {
         Object groupId = getKey(entry);
         Group group = _groupMap.get(groupId);
@@ -173,24 +173,24 @@ public class DefinedGroupMessageGroupMan
         }
     }
 
-    public synchronized E findEarliestAssignedAvailableEntry(final QueueConsumer<?,E,Q,L> sub)
+    public synchronized QueueEntry findEarliestAssignedAvailableEntry(final QueueConsumer<?> sub)
     {
         EntryFinder visitor = new EntryFinder(sub);
         sub.getQueue().visit(visitor);
         return visitor.getEntry();
     }
 
-    private class EntryFinder implements QueueEntryVisitor<E>
+    private class EntryFinder implements QueueEntryVisitor
     {
-        private E _entry;
+        private QueueEntry _entry;
         private QueueConsumer _sub;
 
-        public EntryFinder(final QueueConsumer sub)
+        public EntryFinder(final QueueConsumer<?> sub)
         {
             _sub = sub;
         }
 
-        public boolean visit(final E entry)
+        public boolean visit(final QueueEntry entry)
         {
             if(!entry.isAvailable())
             {
@@ -211,18 +211,18 @@ public class DefinedGroupMessageGroupMan
             }
         }
 
-        public E getEntry()
+        public QueueEntry getEntry()
         {
             return _entry;
         }
     }
 
     
-    public void clearAssignments(final QueueConsumer sub)
+    public void clearAssignments(final QueueConsumer<?> sub)
     {
     }
     
-    private Object getKey(E entry)
+    private Object getKey(QueueEntry entry)
     {
         ServerMessage message = entry.getMessage();
         AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
@@ -234,7 +234,7 @@ public class DefinedGroupMessageGroupMan
         return groupVal;
     }
 
-    private class GroupStateChangeListener implements StateChangeListener<MessageInstance<?, ? extends QueueConsumer>, QueueEntry.State>
+    private class GroupStateChangeListener implements StateChangeListener<MessageInstance, MessageInstance.State>
     {
         private final Group _group;
 
@@ -243,7 +243,7 @@ public class DefinedGroupMessageGroupMan
             _group = group;
         }
 
-        public void stateChanged(final MessageInstance<?, ? extends QueueConsumer> entry,
+        public void stateChanged(final MessageInstance entry,
                                  final MessageInstance.State oldState,
                                  final MessageInstance.State newState)
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java Wed Feb 26 23:27:39 2014
@@ -20,20 +20,20 @@
  */
 package org.apache.qpid.server.queue;
 
-public interface MessageGroupManager<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+public interface MessageGroupManager
 {
-    public interface ConsumerResetHelper<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+    public interface ConsumerResetHelper
     {
-        public void resetSubPointersForGroups(QueueConsumer<?,E,Q,L> consumer, boolean clearAssignments);
+        public void resetSubPointersForGroups(QueueConsumer<?> consumer, boolean clearAssignments);
 
-        boolean isEntryAheadOfConsumer(E entry, QueueConsumer<?,E,Q,L> sub);
+        boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer<?> sub);
     }
 
-    QueueConsumer getAssignedConsumer(E entry);
+    QueueConsumer getAssignedConsumer(QueueEntry entry);
 
-    boolean acceptMessage(QueueConsumer<?,E,Q,L> sub, E entry);
+    boolean acceptMessage(QueueConsumer<?> sub, QueueEntry entry);
 
-    E findEarliestAssignedAvailableEntry(QueueConsumer<?,E,Q,L> sub);
+    QueueEntry findEarliestAssignedAvailableEntry(QueueConsumer<?> sub);
 
-    void clearAssignments(QueueConsumer sub);
+    void clearAssignments(QueueConsumer<?> sub);
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org