You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/27 00:27:43 UTC

svn commit: r1572343 [4/7] - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/binding/ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/...

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Wed Feb 26 23:27:39 2014
@@ -1,150 +1,152 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.message.ServerMessage;
-
-public enum NotificationCheck
-{
-
-    MESSAGE_COUNT_ALERT
-    {
-        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
-        {
-            int msgCount;
-            final long maximumMessageCount = queue.getMaximumMessageCount();
-            if (maximumMessageCount!= 0 && (msgCount =  queue.getMessageCount()) >= maximumMessageCount)
-            {
-                String notificationMsg = msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.";
-
-                logNotification(this, queue, notificationMsg);
-                listener.notifyClients(this, queue, notificationMsg);
-                return true;
-            }
-            return false;
-        }
-    },
-    MESSAGE_SIZE_ALERT(true)
-    {
-        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
-        {
-            final long maximumMessageSize = queue.getMaximumMessageSize();
-            if(maximumMessageSize != 0)
-            {
-                // Check for threshold message size
-                long messageSize;
-                messageSize = (msg == null) ? 0 : msg.getSize();
-
-                if (messageSize >= maximumMessageSize)
-                {
-                    String notificationMsg = messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]";
-
-                    logNotification(this, queue, notificationMsg);
-                    listener.notifyClients(this, queue, notificationMsg);
-                    return true;
-                }
-            }
-            return false;
-        }
-
-    },
-    QUEUE_DEPTH_ALERT
-    {
-        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
-        {
-            // Check for threshold queue depth in bytes
-            final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
-            if(maximumQueueDepth != 0)
-            {
-                final long queueDepth = queue.getQueueDepth();
-
-                if (queueDepth >= maximumQueueDepth)
-                {
-                    String notificationMsg = (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.";
-
-                    logNotification(this, queue, notificationMsg);
-                    listener.notifyClients(this, queue, notificationMsg);
-                    return true;
-                }
-            }
-            return false;
-        }
-
-    },
-    MESSAGE_AGE_ALERT
-    {
-        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
-        {
-
-            final long maxMessageAge = queue.getMaximumMessageAge();
-            if(maxMessageAge != 0)
-            {
-                final long currentTime = System.currentTimeMillis();
-                final long thresholdTime = currentTime - maxMessageAge;
-                final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
-                if(firstArrivalTime < thresholdTime)
-                {
-                    long oldestAge = currentTime - firstArrivalTime;
-                    String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
-
-                    logNotification(this, queue, notificationMsg);
-                    listener.notifyClients(this, queue, notificationMsg);
-
-                    return true;
-                }
-            }
-            return false;
-
-        }
-
-    }
-    ;
-
-    private static final Logger LOGGER = Logger.getLogger(NotificationCheck.class);
-
-    private final boolean _messageSpecific;
-
-    NotificationCheck()
-    {
-        this(false);
-    }
-
-    NotificationCheck(boolean messageSpecific)
-    {
-        _messageSpecific = messageSpecific;
-    }
-
-    public boolean isMessageSpecific()
-    {
-        return _messageSpecific;
-    }
-
-    public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener);
-
-    //A bit of a hack, only for use until we do the logging listener
-    private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg)
-    {
-        LOGGER.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.QueueNotificationListener;
+
+
+public enum NotificationCheck
+{
+
+    MESSAGE_COUNT_ALERT
+    {
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener)
+        {
+            int msgCount;
+            final long maximumMessageCount = queue.getAlertThresholdQueueDepthMessages();
+            if (maximumMessageCount!= 0 && (msgCount =  queue.getQueueDepthMessages()) >= maximumMessageCount)
+            {
+                String notificationMsg = msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.";
+
+                logNotification(this, queue, notificationMsg);
+                listener.notifyClients(this, queue, notificationMsg);
+                return true;
+            }
+            return false;
+        }
+    },
+    MESSAGE_SIZE_ALERT(true)
+    {
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener  listener)
+        {
+            final long maximumMessageSize = queue.getAlertThresholdMessageSize();
+            if(maximumMessageSize != 0)
+            {
+                // Check for threshold message size
+                long messageSize;
+                messageSize = (msg == null) ? 0 : msg.getSize();
+
+                if (messageSize >= maximumMessageSize)
+                {
+                    String notificationMsg = messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]";
+
+                    logNotification(this, queue, notificationMsg);
+                    listener.notifyClients(this, queue, notificationMsg);
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    },
+    QUEUE_DEPTH_ALERT
+    {
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener  listener)
+        {
+            // Check for threshold queue depth in bytes
+            final long maximumQueueDepth = queue.getAlertThresholdQueueDepthBytes();
+
+            if(maximumQueueDepth != 0)
+            {
+                final long queueDepth = queue.getQueueDepthBytes();
+
+                if (queueDepth >= maximumQueueDepth)
+                {
+                    String notificationMsg = (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.";
+
+                    logNotification(this, queue, notificationMsg);
+                    listener.notifyClients(this, queue, notificationMsg);
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    },
+    MESSAGE_AGE_ALERT
+    {
+        public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener  listener)
+        {
+
+            final long maxMessageAge = queue.getAlertThresholdMessageAge();
+            if(maxMessageAge != 0)
+            {
+                final long currentTime = System.currentTimeMillis();
+                final long thresholdTime = currentTime - maxMessageAge;
+                final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+                if(firstArrivalTime < thresholdTime)
+                {
+                    long oldestAge = currentTime - firstArrivalTime;
+                    String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
+
+                    logNotification(this, queue, notificationMsg);
+                    listener.notifyClients(this, queue, notificationMsg);
+
+                    return true;
+                }
+            }
+            return false;
+
+        }
+
+    }
+    ;
+
+    private static final Logger LOGGER = Logger.getLogger(NotificationCheck.class);
+
+    private final boolean _messageSpecific;
+
+    NotificationCheck()
+    {
+        this(false);
+    }
+
+    NotificationCheck(boolean messageSpecific)
+    {
+        _messageSpecific = messageSpecific;
+    }
+
+    public boolean isMessageSpecific()
+    {
+        return _messageSpecific;
+    }
+
+    public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener  listener);
+
+    //A bit of a hack, only for use until we do the logging listener
+    private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg)
+    {
+        LOGGER.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+    }
+}

Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java Wed Feb 26 23:27:39 2014
@@ -24,43 +24,45 @@ import org.apache.qpid.server.message.Se
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public abstract class OrderedQueueEntry<E extends OrderedQueueEntry<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> extends QueueEntryImpl<E,Q,L>
+public abstract class OrderedQueueEntry extends QueueEntryImpl
 {
     static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
                 _nextUpdater =
             AtomicReferenceFieldUpdater.newUpdater
             (OrderedQueueEntry.class, OrderedQueueEntry.class, "_next");
 
-    private volatile E _next;
+    private volatile OrderedQueueEntry _next;
 
-    public OrderedQueueEntry(L queueEntryList)
+    public OrderedQueueEntry(OrderedQueueEntryList queueEntryList)
     {
         super(queueEntryList);
     }
 
-    public OrderedQueueEntry(L queueEntryList, ServerMessage message, final long entryId)
+    public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message, final long entryId)
     {
         super(queueEntryList, message, entryId);
     }
 
-    public OrderedQueueEntry(L queueEntryList, ServerMessage message)
+    public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message)
     {
         super(queueEntryList, message);
     }
 
-    public E getNextNode()
+    @Override
+    public OrderedQueueEntry getNextNode()
     {
         return _next;
     }
 
-    public E getNextValidEntry()
+    @Override
+    public OrderedQueueEntry getNextValidEntry()
     {
 
-        E next = getNextNode();
+        OrderedQueueEntry next = getNextNode();
         while(next != null && next.isDeleted())
         {
 
-            final E newNext = next.getNextNode();
+            final OrderedQueueEntry newNext = next.getNextNode();
             if(newNext != null)
             {
                 OrderedQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -26,13 +26,12 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public abstract class OrderedQueueEntryList<E extends OrderedQueueEntry<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements
-                                                                                                                                                        QueueEntryListBase<E,Q,L>
+public abstract class OrderedQueueEntryList implements QueueEntryList
 {
 
-    private final E _head;
+    private final OrderedQueueEntry _head;
 
-    private volatile E _tail;
+    private volatile OrderedQueueEntry _tail;
 
     static final AtomicReferenceFieldUpdater<OrderedQueueEntryList, OrderedQueueEntry>
             _tailUpdater =
@@ -40,27 +39,27 @@ public abstract class OrderedQueueEntryL
         (OrderedQueueEntryList.class, OrderedQueueEntry.class, "_tail");
 
 
-    private final Q _queue;
+    private final AMQQueue _queue;
 
     static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
                 _nextUpdater = OrderedQueueEntry._nextUpdater;
 
     private AtomicLong _scavenges = new AtomicLong(0L);
     private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
-    private final AtomicReference<E> _unscavengedHWM = new AtomicReference<E>();
+    private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
 
 
-    public OrderedQueueEntryList(Q queue, HeadCreator<E,Q,L> headCreator)
+    public OrderedQueueEntryList(AMQQueue queue, HeadCreator headCreator)
     {
         _queue = queue;
-        _head = headCreator.createHead((L)this);
+        _head = headCreator.createHead(this);
         _tail = _head;
     }
 
     void scavenge()
     {
-        E hwm = _unscavengedHWM.getAndSet(null);
-        E next = _head.getNextValidEntry();
+        QueueEntry hwm = _unscavengedHWM.getAndSet(null);
+        QueueEntry next = _head.getNextValidEntry();
 
         if(hwm != null)
         {
@@ -72,15 +71,15 @@ public abstract class OrderedQueueEntryL
     }
 
 
-    public Q getQueue()
+    public AMQQueue getQueue()
     {
         return _queue;
     }
 
 
-    public E add(ServerMessage message)
+    public QueueEntry add(ServerMessage message)
     {
-        E node = createQueueEntry(message);
+        OrderedQueueEntry node = createQueueEntry(message);
         for (;;)
         {
             OrderedQueueEntry tail = _tail;
@@ -105,23 +104,24 @@ public abstract class OrderedQueueEntryL
         }
     }
 
-    abstract protected E createQueueEntry(ServerMessage<?> message);
+    abstract protected OrderedQueueEntry createQueueEntry(ServerMessage<?> message);
 
-    public E next(E node)
+    @Override
+    public QueueEntry next(QueueEntry node)
     {
         return node.getNextValidEntry();
     }
 
-    public static interface HeadCreator<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+    public static interface HeadCreator
     {
-        E createHead(L list);
+        OrderedQueueEntry createHead(QueueEntryList list);
     }
 
-    public static class QueueEntryIteratorImpl<E extends OrderedQueueEntry<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>>
+    public static class QueueEntryIteratorImpl implements QueueEntryIterator
     {
-        private E _lastNode;
+        private QueueEntry _lastNode;
 
-        QueueEntryIteratorImpl(E startNode)
+        QueueEntryIteratorImpl(QueueEntry startNode)
         {
             _lastNode = startNode;
         }
@@ -131,14 +131,14 @@ public abstract class OrderedQueueEntryL
             return _lastNode.getNextValidEntry() == null;
         }
 
-        public E getNode()
+        public QueueEntry getNode()
         {
             return _lastNode;
         }
 
         public boolean advance()
         {
-            E nextValidNode = _lastNode.getNextValidEntry();
+            QueueEntry nextValidNode = _lastNode.getNextValidEntry();
 
             if(nextValidNode != null)
             {
@@ -149,26 +149,26 @@ public abstract class OrderedQueueEntryL
         }
     }
 
-    public QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> iterator()
+    public QueueEntryIterator iterator()
     {
-        return new QueueEntryIteratorImpl<E,Q,L>(_head);
+        return new QueueEntryIteratorImpl(_head);
     }
 
 
-    public E getHead()
+    public QueueEntry getHead()
     {
         return _head;
     }
 
-    public void entryDeleted(E queueEntry)
+    public void entryDeleted(QueueEntry queueEntry)
     {
-        E next = _head.getNextNode();
-        E newNext = _head.getNextValidEntry();
+        QueueEntry next = _head.getNextNode();
+        QueueEntry newNext = _head.getNextValidEntry();
 
         // the head of the queue has not been deleted, hence the deletion must have been mid queue.
         if (next == newNext)
         {
-            E unscavengedHWM = _unscavengedHWM.get();
+            QueueEntry unscavengedHWM = _unscavengedHWM.get();
             while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
             {
                 _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
@@ -182,7 +182,7 @@ public abstract class OrderedQueueEntryL
         }
         else
         {
-            E unscavengedHWM = _unscavengedHWM.get();
+            QueueEntry unscavengedHWM = _unscavengedHWM.get();
             if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0))
             {
                 _unscavengedHWM.compareAndSet(unscavengedHWM, null);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Wed Feb 26 23:27:39 2014
@@ -24,30 +24,30 @@ import org.apache.qpid.server.virtualhos
 
 import java.util.Map;
 
-public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> extends AbstractQueue<E,Q,L>
+public abstract class OutOfOrderQueue extends AbstractQueue
 {
 
     protected OutOfOrderQueue(VirtualHost virtualHost,
                               Map<String, Object> attributes,
-                              QueueEntryListFactory<E, Q, L> entryListFactory)
+                              QueueEntryListFactory entryListFactory)
     {
         super(virtualHost, attributes, entryListFactory);
     }
 
     @Override
-    protected void checkConsumersNotAheadOfDelivery(final E entry)
+    protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
     {
         // check that all consumers are not in advance of the entry
-        QueueConsumerList.ConsumerNodeIterator<E,Q,L> subIter = getConsumerList().iterator();
+        QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator();
         while(subIter.advance() && !entry.isAcquired())
         {
-            final QueueConsumer<?,E,Q,L> consumer = subIter.getNode().getConsumer();
+            final QueueConsumer<?> consumer = subIter.getNode().getConsumer();
             if(!consumer.isClosed())
             {
-                QueueContext<E,Q,L> context = consumer.getQueueContext();
+                QueueContext context = consumer.getQueueContext();
                 if(context != null)
                 {
-                    E released = context.getReleasedEntry();
+                    QueueEntry released = context.getReleasedEntry();
                     while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
                     {
                         if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java Wed Feb 26 23:27:39 2014
@@ -27,7 +27,7 @@ import org.apache.qpid.server.virtualhos
 
 import java.util.Map;
 
-public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+public class PriorityQueue extends OutOfOrderQueue
 {
 
     public static final int DEFAULT_PRIORITY_LEVELS = 10;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Wed Feb 26 23:27:39 2014
@@ -22,23 +22,23 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
 
-abstract public class PriorityQueueList extends OrderedQueueEntryList<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+abstract public class PriorityQueueList extends OrderedQueueEntryList
 {
 
 
     public PriorityQueueList(final PriorityQueue queue,
-                             final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> headCreator)
+                             final HeadCreator headCreator)
     {
         super(queue, headCreator);
     }
 
     static class PriorityQueueMasterList extends PriorityQueueList
     {
-        private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> DUMMY_HEAD_CREATOR =
-                new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+        private static final HeadCreator DUMMY_HEAD_CREATOR =
+                new HeadCreator()
                 {
                     @Override
-                    public PriorityQueueEntry createHead(final PriorityQueueList list)
+                    public PriorityQueueEntry createHead(final QueueEntryList list)
                     {
                         return null;
                     }
@@ -66,11 +66,13 @@ abstract public class PriorityQueueList 
             return _priorities;
         }
 
+        @Override
         public PriorityQueue getQueue()
         {
             return _queue;
         }
 
+        @Override
         public PriorityQueueEntry add(ServerMessage message)
         {
             int index = message.getMessageHeader().getPriority() - _priorityOffset;
@@ -82,7 +84,7 @@ abstract public class PriorityQueueList 
             {
                 index = 0;
             }
-            return _priorityLists[index].add(message);
+            return (PriorityQueueEntry) _priorityLists[index].add(message);
 
         }
 
@@ -92,13 +94,14 @@ abstract public class PriorityQueueList 
             throw new UnsupportedOperationException();
         }
 
-        public PriorityQueueEntry next(PriorityQueueEntry node)
+        @Override
+        public PriorityQueueEntry next(QueueEntry node)
         {
-            PriorityQueueEntry next = node.getNextValidEntry();
+            PriorityQueueEntry next = (PriorityQueueEntry) node.getNextValidEntry();
 
             if(next == null)
             {
-                final PriorityQueueList nodeEntryList = node.getQueueEntryList();
+                final PriorityQueueList nodeEntryList = (PriorityQueueList) ((PriorityQueueEntry)node).getQueueEntryList();
                 int index;
                 for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--)
                 {
@@ -108,16 +111,16 @@ abstract public class PriorityQueueList 
                 while(next == null && index != 0)
                 {
                     index--;
-                    next = _priorityLists[index].getHead().getNextValidEntry();
+                    next = (PriorityQueueEntry) _priorityLists[index].getHead().getNextValidEntry();
                 }
 
             }
             return next;
         }
 
-        private final class PriorityQueueEntryListIterator implements QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList, QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>
+        private final class PriorityQueueEntryListIterator implements QueueEntryIterator
         {
-            private final QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList,QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+            private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
             private PriorityQueueEntry _lastNode;
 
             PriorityQueueEntryListIterator()
@@ -126,10 +129,10 @@ abstract public class PriorityQueueList 
                 {
                     _iterators[i] = _priorityLists[i].iterator();
                 }
-                _lastNode = _iterators[_iterators.length - 1].getNode();
+                _lastNode = (PriorityQueueEntry) _iterators[_iterators.length - 1].getNode();
             }
 
-
+            @Override
             public boolean atTail()
             {
                 for(int i = 0; i < _iterators.length; i++)
@@ -142,18 +145,20 @@ abstract public class PriorityQueueList 
                 return true;
             }
 
+            @Override
             public PriorityQueueEntry getNode()
             {
                 return _lastNode;
             }
 
+            @Override
             public boolean advance()
             {
                 for(int i = _iterators.length-1; i >= 0; i--)
                 {
                     if(_iterators[i].advance())
                     {
-                        _lastNode = _iterators[i].getNode();
+                        _lastNode = (PriorityQueueEntry) _iterators[i].getNode();
                         return true;
                     }
                 }
@@ -161,23 +166,26 @@ abstract public class PriorityQueueList 
             }
         }
 
+        @Override
         public PriorityQueueEntryListIterator iterator()
         {
 
             return new PriorityQueueEntryListIterator();
         }
 
+        @Override
         public PriorityQueueEntry getHead()
         {
-            return _priorityLists[_priorities-1].getHead();
+            return (PriorityQueueEntry) _priorityLists[_priorities-1].getHead();
         }
 
-        public void entryDeleted(final PriorityQueueEntry queueEntry)
+        @Override
+        public void entryDeleted(final QueueEntry queueEntry)
         {
 
         }
     }
-    static class Factory implements QueueEntryListFactory<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+    static class Factory implements QueueEntryListFactory
     {
         private final int _priorities;
 
@@ -186,20 +194,20 @@ abstract public class PriorityQueueList 
             _priorities = priorities;
         }
 
-        public PriorityQueueList createQueueEntryList(PriorityQueue queue)
+        public PriorityQueueList createQueueEntryList(AMQQueue<?> queue)
         {
-            return new PriorityQueueMasterList(queue, _priorities);
+            return new PriorityQueueMasterList((PriorityQueue) queue, _priorities);
         }
     }
 
     static class PriorityQueueEntrySubList extends PriorityQueueList
     {
-        private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> HEAD_CREATOR = new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+        private static final HeadCreator HEAD_CREATOR = new HeadCreator()
         {
             @Override
-            public PriorityQueueEntry createHead(final PriorityQueueList list)
+            public PriorityQueueEntry createHead(final QueueEntryList list)
             {
-                return new PriorityQueueEntry(list);
+                return new PriorityQueueEntry((PriorityQueueList) list);
             }
         };
         private int _listPriority;
@@ -222,7 +230,7 @@ abstract public class PriorityQueueList 
         }
     }
 
-    static class PriorityQueueEntry extends OrderedQueueEntry<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+    static class PriorityQueueEntry extends OrderedQueueEntry
     {
         private PriorityQueueEntry(final PriorityQueueList queueEntryList)
         {
@@ -235,9 +243,10 @@ abstract public class PriorityQueueList 
         }
 
         @Override
-        public int compareTo(final PriorityQueueEntry o)
+        public int compareTo(final QueueEntry o)
         {
-            PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)o.getQueueEntryList();
+            PriorityQueueEntry other = (PriorityQueueEntry)o;
+            PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)other.getQueueEntryList();
             int otherPriority = pqel.getListPriority();
             int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority();
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Wed Feb 26 23:27:39 2014
@@ -24,37 +24,30 @@ import org.apache.qpid.server.consumer.C
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 
-interface QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> extends Consumer
+public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer, org.apache.qpid.server.model.Consumer<X>
 {
 
     void flushBatched();
 
     void queueEmpty();
 
-    boolean hasInterest(E node);
+    boolean hasInterest(QueueEntry node);
 
-    boolean wouldSuspend(E entry);
+    boolean wouldSuspend(QueueEntry entry);
 
-    void restoreCredit(E entry);
+    void restoreCredit(QueueEntry entry);
 
-    void send(E entry, boolean batch);
+    void send(QueueEntry entry, boolean batch);
 
     void queueDeleted();
 
     SubFlushRunner getRunner();
 
-    Q getQueue();
+    AMQQueue getQueue();
 
-    boolean resend(E e);
+    boolean resend(QueueEntry e);
 
-    public static enum State
-    {
-        ACTIVE,
-        SUSPENDED,
-        CLOSED
-    }
+    MessageInstance.ConsumerAcquiredState<X> getOwningState();
 
-    MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState();
-
-    QueueContext<E,Q,L> getQueueContext();
+    QueueContext getQueueContext();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Wed Feb 26 23:27:39 2014
@@ -31,14 +31,23 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.util.StateChangeListener;
 
+import java.security.AccessControlException;
 import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.EnumSet;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -46,17 +55,19 @@ import java.util.concurrent.locks.Reentr
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
 
-class QueueConsumerImpl<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements QueueConsumer<T,E,Q,L>
+class QueueConsumerImpl
+    extends AbstractConfiguredObject<QueueConsumerImpl>
+        implements QueueConsumer<QueueConsumerImpl>
 {
 
 
     private static final Logger _logger = Logger.getLogger(QueueConsumerImpl.class);
     private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
-    private final long _id;
+    private final long _consumerNumber;
     private final Lock _stateChangeLock = new ReentrantLock();
     private final long _createTime = System.currentTimeMillis();
-    private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this);
+    private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
     private final boolean _acquires;
     private final boolean _seesRequeues;
     private final String _consumerName;
@@ -66,8 +77,9 @@ class QueueConsumerImpl<T extends Consum
     private final FilterManager _filters;
     private final Class<? extends ServerMessage> _messageClass;
     private final Object _sessionReference;
-    private final Q _queue;
-    private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+    private final AbstractQueue _queue;
+    private final boolean _exclusive;
+    private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
                                                       + "(UNKNOWN)"
                                                       + "] ");
 
@@ -77,14 +89,14 @@ class QueueConsumerImpl<T extends Consum
     static
     {
         STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
-        STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED);
-        STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED);
+        STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
+        STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
     }
 
-    private final T _target;
+    private final ConsumerTarget _target;
     private final SubFlushRunner _runner = new SubFlushRunner(this);
-    private volatile QueueContext<E,Q,L> _queueContext;
-    private StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumerImpl<T,E,Q,L>, State>()
+    private volatile QueueContext _queueContext;
+    private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
     {
         public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
         {
@@ -93,16 +105,18 @@ class QueueConsumerImpl<T extends Consum
     };
     private final boolean _noLocal;
 
-    QueueConsumerImpl(final Q queue,
-                      T target, final String consumerName,
+    QueueConsumerImpl(final AbstractQueue queue,
+                      ConsumerTarget target, final String consumerName,
                       final FilterManager filters,
                       final Class<? extends ServerMessage> messageClass,
                       EnumSet<Option> optionSet)
     {
-
+        super(UUID.randomUUID(), Collections.<String,Object>emptyMap(),
+              Collections.<String,Object>emptyMap(),
+              queue.getVirtualHost().getTaskExecutor());
         _messageClass = messageClass;
         _sessionReference = target.getSessionModel().getConnectionReference();
-        _id = SUB_ID_GENERATOR.getAndIncrement();
+        _consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
         _filters = filters;
         _acquires = optionSet.contains(Option.ACQUIRES);
         _seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
@@ -111,6 +125,7 @@ class QueueConsumerImpl<T extends Consum
         _target = target;
         _queue = queue;
         _noLocal = optionSet.contains(Option.NO_LOCAL);
+        _exclusive = optionSet.contains(Option.EXCLUSIVE);
         setupLogging(optionSet.contains(Option.EXCLUSIVE));
 
         // Access control
@@ -151,14 +166,14 @@ class QueueConsumerImpl<T extends Consum
         {
             close();
         }
-        final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> stateListener = getStateListener();
+        final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
         if(stateListener != null)
         {
             stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
         }
     }
 
-    public T getTarget()
+    public ConsumerTarget getTarget()
     {
         return _target;
     }
@@ -166,7 +181,7 @@ class QueueConsumerImpl<T extends Consum
     @Override
     public void externalStateChange()
     {
-        getQueue().deliverAsync(this);
+        _queue.deliverAsync(this);
     }
 
     @Override
@@ -229,12 +244,12 @@ class QueueConsumerImpl<T extends Consum
         _target.queueDeleted();
     }
 
-    public boolean wouldSuspend(final E msg)
+    public boolean wouldSuspend(final QueueEntry msg)
     {
         return !_target.allocateCredit(msg.getMessage());
     }
 
-    public void restoreCredit(final E queueEntry)
+    public void restoreCredit(final QueueEntry queueEntry)
     {
         _target.restoreCredit(queueEntry.getMessage());
     }
@@ -244,12 +259,12 @@ class QueueConsumerImpl<T extends Consum
         _target.queueEmpty();
     }
 
-    State getState()
+    public State getState()
     {
         return STATE_MAP.get(_target.getState());
     }
 
-    public final Q getQueue()
+    public final AMQQueue getQueue()
     {
         return _queue;
     }
@@ -258,7 +273,7 @@ class QueueConsumerImpl<T extends Consum
     {
         String queueString = new QueueLogSubject(_queue).toLogString();
 
-        _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+        _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
                              + "("
                              // queueString is [vh(/{0})/qu({1}) ] so need to trim
                              //                ^                ^^
@@ -289,10 +304,10 @@ class QueueConsumerImpl<T extends Consum
     @Override
     public final void flush()
     {
-        getQueue().flushConsumer(this);
+        _queue.flushConsumer(this);
     }
 
-    public boolean resend(final E entry)
+    public boolean resend(final QueueEntry entry)
     {
         return getQueue().resend(entry, this);
     }
@@ -302,27 +317,27 @@ class QueueConsumerImpl<T extends Consum
         return _runner;
     }
 
-    public final long getId()
+    public final long getConsumerNumber()
     {
-        return _id;
+        return _consumerNumber;
     }
 
-    public final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> getStateListener()
+    public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
     {
         return _stateListener;
     }
 
-    public final void setStateListener(StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> listener)
+    public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
     {
         _stateListener = listener;
     }
 
-    public final QueueContext<E,Q,L> getQueueContext()
+    public final QueueContext getQueueContext()
     {
         return _queueContext;
     }
 
-    final void setQueueContext(QueueContext<E,Q,L> queueContext)
+    final void setQueueContext(QueueContext queueContext)
     {
         _queueContext = queueContext;
     }
@@ -334,10 +349,10 @@ class QueueConsumerImpl<T extends Consum
 
     public final boolean isClosed()
     {
-        return getState() == State.CLOSED;
+        return getState() == State.DELETED;
     }
 
-    public final boolean hasInterest(E entry)
+    public final boolean hasInterest(QueueEntry entry)
     {
        //check that the message hasn't been rejected
         if (entry.isRejectedBy(this))
@@ -412,7 +427,7 @@ class QueueConsumerImpl<T extends Consum
         return _createTime;
     }
 
-    public final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
+    public final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> getOwningState()
     {
         return _owningState;
     }
@@ -447,11 +462,126 @@ class QueueConsumerImpl<T extends Consum
         return _deliveredCount.longValue();
     }
 
-    public final void send(final E entry, final boolean batch)
+    public final void send(final QueueEntry entry, final boolean batch)
     {
         _deliveredCount.incrementAndGet();
         ServerMessage message = entry.getMessage();
         _deliveredBytes.addAndGet(message.getSize());
         _target.send(entry, batch);
     }
+
+    @Override
+    protected boolean setState(final State currentState, final State desiredState)
+    {
+        return false;
+    }
+
+    @Override
+    public String getDistributionMode()
+    {
+        return acquires() ? "MOVE" : "COPY";
+    }
+
+    @Override
+    public String getSettlementMode()
+    {
+        return null;
+    }
+
+    @Override
+    public boolean isExclusive()
+    {
+        return _exclusive;
+    }
+
+    @Override
+    public boolean isNoLocal()
+    {
+        return isNoLocal();
+    }
+
+    @Override
+    public String getSelector()
+    {
+        return null;
+    }
+
+    @Override
+    public String setName(final String currentName, final String desiredName)
+            throws IllegalStateException, AccessControlException
+    {
+        return null;
+    }
+
+    @Override
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    @Override
+    public void setDurable(final boolean durable)
+            throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+
+    }
+
+    @Override
+    public LifetimePolicy getLifetimePolicy()
+    {
+        return LifetimePolicy.DELETE_ON_SESSION_END;
+    }
+
+    @Override
+    public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+            throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+        return null;
+    }
+
+    @Override
+    public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+    {
+        return Collections.<C>emptyList();
+    }
+
+    @Override
+    public Object getAttribute(final String name)
+    {
+        if(ID.equals(name))
+        {
+            return getId();
+        }
+        else if(NAME.equals(name))
+        {
+            return getName();
+        }
+        else if(DURABLE.equals(name))
+        {
+            return isDurable();
+        }
+        else if(DISTRIBUTION_MODE.equals(name))
+        {
+            return getDistributionMode();
+        }
+        else if(SETTLEMENT_MODE.equals(name))
+        {
+            return getSettlementMode();
+        }
+        else if(LIFETIME_POLICY.equals(name))
+        {
+            return getLifetimePolicy();
+        }
+        else if(EXCLUSIVE.equals(name))
+        {
+            return isExclusive();
+        }
+        return super.getAttribute(name);
+    }
+
+    @Override
+    public Collection<String> getAttributeNames()
+    {
+        return getAttributeNames(getClass());
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java Wed Feb 26 23:27:39 2014
@@ -24,19 +24,19 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-class QueueConsumerList<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E, Q,L>, L extends QueueEntryListBase<E,Q,L>>
+class QueueConsumerList
 {
-    private final ConsumerNode<E,Q,L> _head = new ConsumerNode<E,Q,L>();
+    private final ConsumerNode _head = new ConsumerNode();
 
-    private final AtomicReference<ConsumerNode<E,Q,L>> _tail = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
-    private final AtomicReference<ConsumerNode<E,Q,L>> _subNodeMarker = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
+    private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head);
+    private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head);
     private final AtomicInteger _size = new AtomicInteger();
 
-    public static final class ConsumerNode<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E, Q,L>, L extends QueueEntryListBase<E,Q,L>>
+    public static final class ConsumerNode
     {
         private final AtomicBoolean _deleted = new AtomicBoolean();
-        private final AtomicReference<ConsumerNode<E,Q,L>> _next = new AtomicReference<ConsumerNode<E,Q,L>>();
-        private final QueueConsumer<?,E,Q,L> _sub;
+        private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>();
+        private final QueueConsumer<?> _sub;
 
         public ConsumerNode()
         {
@@ -45,7 +45,7 @@ class QueueConsumerList<E extends QueueE
             _deleted.set(true);
         }
 
-        public ConsumerNode(final QueueConsumer<?,E,Q,L> sub)
+        public ConsumerNode(final QueueConsumer<?> sub)
         {
             //used for regular node construction
             _sub = sub;
@@ -57,12 +57,12 @@ class QueueConsumerList<E extends QueueE
          *
          * @return the next non-deleted node, or null if none was found.
          */
-        public ConsumerNode<E,Q,L> findNext()
+        public ConsumerNode findNext()
         {
-            ConsumerNode<E,Q,L> next = nextNode();
+            ConsumerNode next = nextNode();
             while(next != null && next.isDeleted())
             {
-                final ConsumerNode<E,Q,L> newNext = next.nextNode();
+                final ConsumerNode newNext = next.nextNode();
                 if(newNext != null)
                 {
                     //try to move our _next reference forward to the 'newNext'
@@ -86,7 +86,7 @@ class QueueConsumerList<E extends QueueE
          *
          * @return the immediately next node in the structure, or null if at the tail.
          */
-        protected ConsumerNode<E,Q,L> nextNode()
+        protected ConsumerNode nextNode()
         {
             return _next.get();
         }
@@ -97,7 +97,7 @@ class QueueConsumerList<E extends QueueE
          * @param node the ConsumerNode to set as 'next'
          * @return whether the operation succeeded
          */
-        private boolean setNext(final ConsumerNode<E,Q,L> node)
+        private boolean setNext(final ConsumerNode node)
         {
             return _next.compareAndSet(null, node);
         }
@@ -112,18 +112,18 @@ class QueueConsumerList<E extends QueueE
             return _deleted.compareAndSet(false,true);
         }
 
-        public QueueConsumer<?,E,Q,L> getConsumer()
+        public QueueConsumer<?> getConsumer()
         {
             return _sub;
         }
     }
 
-    private void insert(final ConsumerNode<E,Q,L> node, final boolean count)
+    private void insert(final ConsumerNode node, final boolean count)
     {
         for (;;)
         {
-            ConsumerNode<E,Q,L> tail = _tail.get();
-            ConsumerNode<E,Q,L> next = tail.nextNode();
+            ConsumerNode tail = _tail.get();
+            ConsumerNode next = tail.nextNode();
             if (tail == _tail.get())
             {
                 if (next == null)
@@ -146,16 +146,16 @@ class QueueConsumerList<E extends QueueE
         }
     }
 
-    public void add(final QueueConsumer<?,E,Q,L> sub)
+    public void add(final QueueConsumer<?> sub)
     {
-        ConsumerNode<E,Q,L> node = new ConsumerNode<E,Q,L>(sub);
+        ConsumerNode node = new ConsumerNode(sub);
         insert(node, true);
     }
 
-    public boolean remove(final QueueConsumer<?, E,Q,L> sub)
+    public boolean remove(final QueueConsumer<?> sub)
     {
-        ConsumerNode<E,Q,L> prevNode = _head;
-        ConsumerNode<E,Q,L> node = _head.nextNode();
+        ConsumerNode prevNode = _head;
+        ConsumerNode node = _head.nextNode();
 
         while(node != null)
         {
@@ -170,7 +170,7 @@ class QueueConsumerList<E extends QueueE
                     //correctness reasons, however we have just 'deleted'
                     //the tail. Inserting an empty dummy node after it will
                     //let us scavenge the node containing the Consumer.
-                    insert(new ConsumerNode<E,Q,L>(), false);
+                    insert(new ConsumerNode(), false);
                 }
 
                 //advance the next node reference in the 'prevNode' to scavenge
@@ -189,9 +189,9 @@ class QueueConsumerList<E extends QueueE
         return false;
     }
 
-    private void nodeMarkerCleanup(final ConsumerNode<E,Q,L> node)
+    private void nodeMarkerCleanup(final ConsumerNode node)
     {
-        ConsumerNode<E,Q,L> markedNode = _subNodeMarker.get();
+        ConsumerNode markedNode = _subNodeMarker.get();
         if(node == markedNode)
         {
             //if the marked node is the one we are removing, then
@@ -200,7 +200,7 @@ class QueueConsumerList<E extends QueueE
             //into the list and find the next node to use.
             //Because we inserted a dummy if node was the
             //tail, markedNode.nextNode() can never be null.
-            ConsumerNode<E,Q,L> dummy = new ConsumerNode<E,Q,L>();
+            ConsumerNode dummy = new ConsumerNode();
             dummy.setNext(markedNode.nextNode());
 
             //if the CAS fails the marked node has changed, thus
@@ -219,7 +219,7 @@ class QueueConsumerList<E extends QueueE
         }
     }
 
-    public boolean updateMarkedNode(final ConsumerNode<E,Q,L> expected, final ConsumerNode<E,Q,L> nextNode)
+    public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode)
     {
         return _subNodeMarker.compareAndSet(expected, nextNode);
     }
@@ -231,41 +231,41 @@ class QueueConsumerList<E extends QueueE
      *
      * @return the previously marked node (or a dummy if it was subsequently deleted)
      */
-    public ConsumerNode<E,Q,L> getMarkedNode()
+    public ConsumerNode getMarkedNode()
     {
         return _subNodeMarker.get();
     }
 
 
-    public static class ConsumerNodeIterator<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E, Q,L>, L extends QueueEntryListBase<E,Q,L>>
+    public static class ConsumerNodeIterator
     {
-        private ConsumerNode<E,Q,L> _lastNode;
+        private ConsumerNode _lastNode;
 
-        ConsumerNodeIterator(ConsumerNode<E,Q,L> startNode)
+        ConsumerNodeIterator(ConsumerNode startNode)
         {
             _lastNode = startNode;
         }
 
-        public ConsumerNode<E,Q,L> getNode()
+        public ConsumerNode getNode()
         {
             return _lastNode;
         }
 
         public boolean advance()
         {
-            ConsumerNode<E,Q,L> nextNode = _lastNode.findNext();
+            ConsumerNode nextNode = _lastNode.findNext();
             _lastNode = nextNode;
 
             return _lastNode != null;
         }
     }
 
-    public ConsumerNodeIterator<E,Q,L> iterator()
+    public ConsumerNodeIterator iterator()
     {
-        return new ConsumerNodeIterator<E,Q,L>(_head);
+        return new ConsumerNodeIterator(_head);
     }
 
-    public ConsumerNode<E,Q,L> getHead()
+    public ConsumerNode getHead()
     {
         return _head;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Wed Feb 26 23:27:39 2014
@@ -23,32 +23,32 @@ package org.apache.qpid.server.queue;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+final class QueueContext
 {
-    private volatile E _lastSeenEntry;
-    private volatile E _releasedEntry;
+    private volatile QueueEntry _lastSeenEntry;
+    private volatile QueueEntry _releasedEntry;
 
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
             _lastSeenUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry");
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
+        (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
             _releasedUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntryImpl.class, "_releasedEntry");
+        (QueueContext.class, QueueEntry.class, "_releasedEntry");
 
-    public QueueContext(E head)
+    public QueueContext(QueueEntry head)
     {
         _lastSeenEntry = head;
     }
 
-    public E getLastSeenEntry()
+    public QueueEntry getLastSeenEntry()
     {
         return _lastSeenEntry;
     }
 
 
-    E getReleasedEntry()
+    QueueEntry getReleasedEntry()
     {
         return _releasedEntry;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Wed Feb 26 23:27:39 2014
@@ -23,18 +23,18 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.MessageInstance;
 
-public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E>
+public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
 {
 
-    Q getQueue();
+    AMQQueue getQueue();
 
     long getSize();
 
     boolean isQueueDeleted();
 
-    E getNextNode();
+    QueueEntry getNextNode();
 
-    E getNextValidEntry();
+    QueueEntry getNextValidEntry();
 
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Feb 26 23:27:39 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
@@ -43,11 +43,11 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>>
+public abstract class QueueEntryImpl implements QueueEntry
 {
     private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
 
-    private final L _queueEntryList;
+    private final QueueEntryList _queueEntryList;
 
     private final MessageReference _message;
 
@@ -62,7 +62,7 @@ public abstract class QueueEntryImpl<E e
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners;
+    private volatile Set<StateChangeListener<? super QueueEntry, State>> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -89,14 +89,14 @@ public abstract class QueueEntryImpl<E e
     private boolean _deliveredToConsumer;
 
 
-    public QueueEntryImpl(L queueEntryList)
+    public QueueEntryImpl(QueueEntryList queueEntryList)
     {
         this(queueEntryList,null,Long.MIN_VALUE);
         _state = DELETED_STATE;
     }
 
 
-    public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId)
+    public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message, final long entryId)
     {
         _queueEntryList = queueEntryList;
 
@@ -106,7 +106,7 @@ public abstract class QueueEntryImpl<E e
         populateInstanceProperties();
     }
 
-    public QueueEntryImpl(L queueEntryList, ServerMessage message)
+    public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
     {
         _queueEntryList = queueEntryList;
         _message = message == null ? null :  message.newReference();
@@ -137,7 +137,7 @@ public abstract class QueueEntryImpl<E e
         return _entryId;
     }
 
-    public Q getQueue()
+    public AMQQueue getQueue()
     {
         return _queueEntryList.getQueue();
     }
@@ -201,9 +201,9 @@ public abstract class QueueEntryImpl<E e
         return acquired;
     }
 
-    public boolean acquire(QueueConsumer<?,E,Q,L> sub)
+    public boolean acquire(Consumer sub)
     {
-        final boolean acquired = acquire(sub.getOwningState());
+        final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
         if(acquired)
         {
             _deliveredToConsumer = true;
@@ -217,7 +217,7 @@ public abstract class QueueEntryImpl<E e
         return (_state instanceof ConsumerAcquiredState);
     }
 
-    public boolean isAcquiredBy(QueueConsumer consumer)
+    public boolean isAcquiredBy(Consumer consumer)
     {
         EntryState state = _state;
         return state instanceof ConsumerAcquiredState
@@ -233,12 +233,12 @@ public abstract class QueueEntryImpl<E e
 
             if(state instanceof ConsumerAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount((E) this);
+                getQueue().decrementUnackedMsgCount(this);
             }
 
             if(!getQueue().isDeleted())
             {
-                getQueue().requeue((E)this);
+                getQueue().requeue(this);
                 if(_stateChangeListeners != null)
                 {
                     notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
@@ -287,7 +287,7 @@ public abstract class QueueEntryImpl<E e
                 _rejectedBy = new HashSet<Long>();
             }
 
-            _rejectedBy.add(consumer.getId());
+            _rejectedBy.add(consumer.getConsumerNumber());
         }
         else
         {
@@ -295,12 +295,12 @@ public abstract class QueueEntryImpl<E e
         }
     }
 
-    public boolean isRejectedBy(QueueConsumer consumer)
+    public boolean isRejectedBy(Consumer consumer)
     {
 
         if (_rejectedBy != null) // We have consumers that rejected this message
         {
-            return _rejectedBy.contains(consumer.getId());
+            return _rejectedBy.contains(consumer.getConsumerNumber());
         }
         else // This message hasn't been rejected yet.
         {
@@ -316,10 +316,10 @@ public abstract class QueueEntryImpl<E e
         {
             if (state instanceof ConsumerAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount((E) this);
+                getQueue().decrementUnackedMsgCount(this);
             }
 
-            getQueue().dequeue((E)this);
+            getQueue().dequeue(this);
             if(_stateChangeListeners != null)
             {
                 notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -331,9 +331,9 @@ public abstract class QueueEntryImpl<E e
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener<? super E, State> l : _stateChangeListeners)
+        for(StateChangeListener<? super QueueEntry, State> l : _stateChangeListeners)
         {
-            l.stateChanged((E)this, oldState, newState);
+            l.stateChanged(this, oldState, newState);
         }
     }
 
@@ -343,7 +343,7 @@ public abstract class QueueEntryImpl<E e
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
-            _queueEntryList.entryDeleted((E)this);
+            _queueEntryList.entryDeleted(this);
             onDelete();
             _message.release();
 
@@ -362,10 +362,10 @@ public abstract class QueueEntryImpl<E e
         dispose();
     }
 
-    public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn)
+    public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
     {
         final AMQQueue currentQueue = getQueue();
-        Exchange alternateExchange = currentQueue.getAlternateExchange();
+        ExchangeImpl alternateExchange = currentQueue.getAlternateExchange();
         boolean autocommit =  txn == null;
         int enqueues;
 
@@ -412,21 +412,21 @@ public abstract class QueueEntryImpl<E e
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener<? super E,State> listener)
+    public void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener)
     {
-        Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
         if(listeners == null)
         {
-            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>());
+            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super QueueEntry, State>>());
             listeners = _stateChangeListeners;
         }
 
         listeners.add(listener);
     }
 
-    public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener)
     {
-        Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
         if(listeners != null)
         {
             return listeners.remove(listener);
@@ -436,9 +436,13 @@ public abstract class QueueEntryImpl<E e
     }
 
 
-    public int compareTo(final E o)
+    public int compareTo(final QueueEntry o)
     {
-        E other = o;
+        if(o == null)
+        {
+            return 1;
+        }
+        QueueEntryImpl other = (QueueEntryImpl)o;
         return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
     }
 
@@ -446,7 +450,7 @@ public abstract class QueueEntryImpl<E e
     {
     }
 
-    public L getQueueEntryList()
+    public QueueEntryList getQueueEntryList()
     {
         return _queueEntryList;
     }
@@ -464,7 +468,7 @@ public abstract class QueueEntryImpl<E e
     @Override
     public int getMaximumDeliveryCount()
     {
-        return getQueue().getMaximumDeliveryCount();
+        return getQueue().getMaximumDeliveryAttempts();
     }
 
     public void incrementDeliveryCount()
@@ -494,10 +498,10 @@ public abstract class QueueEntryImpl<E e
     @Override
     public boolean resend()
     {
-        QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer();
+        QueueConsumer sub = getDeliveredConsumer();
         if(sub != null)
         {
-            return sub.resend((E)this);
+            return sub.resend(this);
         }
         return false;
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Wed Feb 26 23:27:39 2014
@@ -20,13 +20,11 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.consumer.Consumer;
-
-public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
+public interface QueueEntryIterator
 {
     boolean atTail();
 
-    E getNode();
+    QueueEntry getNode();
 
     boolean advance();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -23,19 +23,19 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.ServerMessage;
 
-public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
+public interface QueueEntryList
 {
-    Q getQueue();
+    AMQQueue getQueue();
 
-    E add(ServerMessage message);
+    QueueEntry add(ServerMessage message);
 
-    E next(E node);
+    QueueEntry next(QueueEntry node);
 
-    QueueEntryIterator<E,Q,L,C> iterator();
+    QueueEntryIterator iterator();
 
-    E getHead();
+    QueueEntry getHead();
 
-    void entryDeleted(E queueEntry);
+    void entryDeleted(QueueEntry queueEntry);
     
     int getPriorities();
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java Wed Feb 26 23:27:39 2014
@@ -20,6 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-public interface QueueEntryListBase<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> extends QueueEntryList<E,Q,L,QueueConsumer<?,E,Q,L>>
+public interface QueueEntryListBase extends QueueEntryList
 {
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Wed Feb 26 23:27:39 2014
@@ -20,7 +20,7 @@
 */
 package org.apache.qpid.server.queue;
 
-interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+interface QueueEntryListFactory
 {
-    public L createQueueEntryList(Q queue);
+    public QueueEntryList createQueueEntryList(AMQQueue<?> queue);
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Wed Feb 26 23:27:39 2014
@@ -16,9 +16,7 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.consumer.Consumer;
-
-public interface QueueEntryVisitor<E extends QueueEntry>
+public interface QueueEntryVisitor
 {
-    boolean visit(E entry);
+    boolean visit(QueueEntry entry);
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Wed Feb 26 23:27:39 2014
@@ -28,7 +28,7 @@ import org.apache.qpid.server.virtualhos
 
 import java.util.Map;
 
-public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
+public class SortedQueue extends OutOfOrderQueue
 {
     //Lock object to synchronize enqueue. Used instead of the object
     //monitor to prevent lock order issues with consumer sendLocks
@@ -38,7 +38,7 @@ public class SortedQueue extends OutOfOr
 
     protected SortedQueue(VirtualHost virtualHost,
                           Map<String, Object> attributes,
-                          QueueEntryListFactory<SortedQueueEntry, SortedQueue, SortedQueueEntryList> factory)
+                          QueueEntryListFactory factory)
     {
         super(virtualHost, attributes, factory);
         _sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes);
@@ -62,7 +62,7 @@ public class SortedQueue extends OutOfOr
 
     @Override
     public void enqueue(final ServerMessage message,
-                        final Action<? super MessageInstance<?, QueueConsumer<?, SortedQueueEntry, SortedQueue, SortedQueueEntryList>>> action)
+                        final Action<? super MessageInstance> action)
     {
         synchronized (_sortedQueueLock)
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java Wed Feb 26 23:27:39 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
 /**
  * An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
  */
-public class SortedQueueEntry extends QueueEntryImpl<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
+public class SortedQueueEntry extends QueueEntryImpl
 {
     public static enum Colour
     {
@@ -52,8 +52,9 @@ public class SortedQueueEntry extends Qu
     }
 
     @Override
-    public int compareTo(final SortedQueueEntry o)
+    public int compareTo(final QueueEntry other)
     {
+        SortedQueueEntry o = (SortedQueueEntry)other;
         final String otherKey = o._key;
         final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey);
         return compare == 0 ? super.compareTo(o) : compare;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -30,7 +30,7 @@ import org.apache.qpid.server.queue.Sort
  * ISBN-13: 978-0262033848
  * see http://en.wikipedia.org/wiki/Red-black_tree
  */
-public class SortedQueueEntryList implements QueueEntryListBase<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
+public class SortedQueueEntryList implements QueueEntryList
 {
     private final SortedQueueEntry _head;
     private SortedQueueEntry _root;
@@ -279,8 +279,9 @@ public class SortedQueueEntryList implem
         return (node == null ? Colour.BLACK : node.getColour()) == colour;
     }
 
-    public SortedQueueEntry next(final SortedQueueEntry node)
+    public SortedQueueEntry next(final QueueEntry entry)
     {
+        SortedQueueEntry node = (SortedQueueEntry)entry;
         synchronized(_lock)
         {
             if(node.isDeleted() && _head != node)
@@ -308,7 +309,7 @@ public class SortedQueueEntryList implem
         }
     }
 
-    public QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>> iterator()
+    public QueueEntryIterator iterator()
     {
         return new QueueEntryIteratorImpl(_head);
     }
@@ -323,8 +324,9 @@ public class SortedQueueEntryList implem
         return _root;
     }
 
-    public void entryDeleted(final SortedQueueEntry entry)
+    public void entryDeleted(final QueueEntry e)
     {
+        SortedQueueEntry entry = (SortedQueueEntry)e;
         synchronized(_lock)
         {
             // If the node to be removed has two children, we swap the position
@@ -618,7 +620,7 @@ public class SortedQueueEntryList implem
         return x == null ? null : x.getColour();
     }
 
-    public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>>
+    public class QueueEntryIteratorImpl implements QueueEntryIterator
     {
         private SortedQueueEntry _lastNode;
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java Wed Feb 26 23:27:39 2014
@@ -19,7 +19,7 @@
  */
 package org.apache.qpid.server.queue;
 
-public class SortedQueueEntryListFactory implements QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>
+public class SortedQueueEntryListFactory implements QueueEntryListFactory
 {
 
     private final String _propertyName;
@@ -30,8 +30,8 @@ public class SortedQueueEntryListFactory
     }
 
     @Override
-    public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+    public SortedQueueEntryList createQueueEntryList(final AMQQueue<?> queue)
     {
-        return new SortedQueueEntryList(queue, _propertyName);
+        return new SortedQueueEntryList((SortedQueue) queue, _propertyName);
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java Wed Feb 26 23:27:39 2014
@@ -25,7 +25,7 @@ import org.apache.qpid.server.virtualhos
 
 import java.util.Map;
 
-public class StandardQueue extends AbstractQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList>
+public class StandardQueue extends AbstractQueue
 {
     public StandardQueue(final VirtualHost virtualHost,
                          final Map<String, Object> arguments)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java Wed Feb 26 23:27:39 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
 
-public class StandardQueueEntry extends OrderedQueueEntry<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+public class StandardQueueEntry extends OrderedQueueEntry
 {
     protected StandardQueueEntry(final StandardQueueEntryList queueEntryList)
     {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -22,15 +22,15 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
 
-public class StandardQueueEntryList extends OrderedQueueEntryList<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+public class StandardQueueEntryList extends OrderedQueueEntryList
 {
 
-    private static final HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList> HEAD_CREATOR = new HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList>()
+    private static final HeadCreator HEAD_CREATOR = new HeadCreator()
     {
         @Override
-        public StandardQueueEntry createHead(final StandardQueueEntryList list)
+        public StandardQueueEntry createHead(final QueueEntryList list)
         {
-            return new StandardQueueEntry(list);
+            return new StandardQueueEntry((StandardQueueEntryList) list);
         }
     };
 
@@ -45,12 +45,12 @@ public class StandardQueueEntryList exte
         return new StandardQueueEntry(this, message);
     }
 
-    static class Factory implements QueueEntryListFactory<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+    static class Factory implements QueueEntryListFactory
     {
 
-        public StandardQueueEntryList createQueueEntryList(StandardQueue queue)
+        public StandardQueueEntryList createQueueEntryList(AMQQueue<?> queue)
         {
-            return new StandardQueueEntryList(queue);
+            return new StandardQueueEntryList((StandardQueue) queue);
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org