You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC

svn commit: r1567616 [5/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/ qpid/cpp/bindings/qpid/ruby/ qp...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Feb 12 13:27:51 2014
@@ -26,19 +26,20 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.CapacityChecker;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
+public interface AMQQueue<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer>
+        extends Comparable<Q>, ExchangeReferrer, BaseQueue<C>, MessageSource<C,Q>, CapacityChecker, MessageDestination
 {
-    String getName();
 
     public interface NotificationListener
     {
@@ -65,45 +66,20 @@ public interface AMQQueue extends Compar
 
     long getTotalEnqueueCount();
 
-    public interface Context
-    {
-        QueueEntry getLastSeenEntry();
-    }
-
     void setNoLocal(boolean b);
 
     boolean isAutoDelete();
 
     String getOwner();
-    AuthorizationHolder getAuthorizationHolder();
-    void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
-    void setExclusiveOwningSession(AMQSessionModel owner);
-    AMQSessionModel getExclusiveOwningSession();
 
     VirtualHost getVirtualHost();
 
-    void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
-
-    void unregisterSubscription(final Subscription subscription) throws AMQException;
-
-    Collection<Subscription> getConsumers();
-
-    interface SubscriptionRegistrationListener
-    {
-        void subscriptionRegistered(AMQQueue queue, Subscription subscription);
-        void subscriptionUnregistered(AMQQueue queue, Subscription subscription);
-    }
-
-    void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
-    void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
-
 
     int getConsumerCount();
 
     int getActiveConsumerCount();
 
-    boolean hasExclusiveSubscriber();
+    boolean hasExclusiveConsumer();
 
     boolean isUnused();
 
@@ -111,41 +87,35 @@ public interface AMQQueue extends Compar
 
     int getMessageCount();
 
-    int getUndeliveredMessageCount();
-
 
     long getQueueDepth();
 
-    long getReceivedMessageCount();
-
     long getOldestMessageArrivalTime();
 
     boolean isDeleted();
 
     int delete() throws AMQException;
 
-    void requeue(QueueEntry entry);
+    void requeue(E entry);
 
-    void dequeue(QueueEntry entry, Subscription sub);
+    void dequeue(E entry);
 
-    void decrementUnackedMsgCount(QueueEntry queueEntry);
+    void decrementUnackedMsgCount(E queueEntry);
 
-    boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+    boolean resend(final E entry, final C consumer) throws AMQException;
 
-    void addQueueDeleteTask(final Task task);
-    void removeQueueDeleteTask(final Task task);
+    void addQueueDeleteTask(Action<AMQQueue> task);
+    void removeQueueDeleteTask(Action<AMQQueue> task);
 
 
 
-    List<QueueEntry> getMessagesOnTheQueue();
-
-    List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId);
+    List<E> getMessagesOnTheQueue();
 
     List<Long> getMessagesOnTheQueue(int num);
 
     List<Long> getMessagesOnTheQueue(int num, int offset);
 
-    QueueEntry getMessageOnTheQueue(long messageId);
+    E getMessageOnTheQueue(long messageId);
 
     /**
      * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
@@ -156,9 +126,9 @@ public interface AMQQueue extends Compar
      * @param toPosition
      * @return
      */
-    public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
+    public List<E> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
 
-    void visit(QueueEntryVisitor visitor);
+    void visit(QueueEntryVisitor<E> visitor);
 
 
     long getMaximumMessageSize();
@@ -209,16 +179,10 @@ public interface AMQQueue extends Compar
 
     Set<NotificationCheck> getNotificationChecks();
 
-    void flushSubscription(final Subscription sub) throws AMQException;
-
-    void deliverAsync(final Subscription sub);
-
     void deliverAsync();
 
     void stop();
 
-    boolean isExclusive();
-
     Exchange getAlternateExchange();
 
     void setAlternateExchange(Exchange exchange);
@@ -226,56 +190,6 @@ public interface AMQQueue extends Compar
     Collection<String> getAvailableAttributes();
     Object getAttribute(String attrName);
 
-    void checkCapacity(AMQSessionModel channel);
-
-    /**
-     * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    static final class ExistingExclusiveSubscription extends AMQException
-    {
-
-        public ExistingExclusiveSubscription()
-        {
-            super("");
-        }
-    }
-
-    /**
-     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    static final class ExistingSubscriptionPreventsExclusive extends AMQException
-    {
-        public ExistingSubscriptionPreventsExclusive()
-        {
-            super("");
-        }
-    }
-
-    static interface Task
-    {
-        public void doTask(AMQQueue queue) throws AMQException;
-    }
-
     void configure(QueueConfiguration config);
 
     void setExclusive(boolean exclusive);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Wed Feb 12 13:27:51 2014
@@ -288,11 +288,11 @@ public class AMQQueueFactory implements 
         }
         else if(priorities > 1)
         {
-            q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
+            q = new PriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments, priorities);
         }
         else
         {
-            q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
+            q = new StandardQueue(id, queueName, durable, owner, autoDelete, exclusive, _virtualHost, arguments);
         }
 
         q.setDeleteOnNoConsumers(deleteOnNoConsumer);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Feb 12 13:27:51 2014
@@ -22,19 +22,15 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
 
-public interface BaseQueue extends TransactionLogResource
+public interface BaseQueue<C extends Consumer> extends TransactionLogResource
 {
-    public static interface PostEnqueueAction
-    {
-        public void onEnqueue(QueueEntry entry);
-    }
-
-    void enqueue(ServerMessage message) throws AMQException;
-    void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
-    void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
+    void enqueue(ServerMessage message, Action<? super MessageInstance<?,C>> action) throws AMQException;
 
     boolean isDurable();
     boolean isDeleted();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Wed Feb 12 13:27:51 2014
@@ -26,7 +26,7 @@ import java.util.UUID;
 
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-public class ConflationQueue extends SimpleAMQQueue
+public class ConflationQueue extends SimpleAMQQueue<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
 {
     protected ConflationQueue(UUID id,
                               String name,
@@ -42,7 +42,7 @@ public class ConflationQueue extends Sim
 
     public String getConflationKey()
     {
-        return ((ConflationQueueList) getEntries()).getConflationKey();
+        return getEntries().getConflationKey();
     }
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Feb 12 13:27:51 2014
@@ -32,23 +32,38 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
-public class ConflationQueueList extends SimpleQueueEntryList
+public class ConflationQueueList extends OrderedQueueEntryList<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
 
+    private static final HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList> HEAD_CREATOR = new HeadCreator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>()
+    {
+
+        @Override
+        public ConflationQueueEntry createHead(final ConflationQueueList list)
+        {
+            return list.createHead();
+        }
+    };
+
     private final String _conflationKey;
-    private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
-        new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+    private final ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>> _latestValuesMap =
+        new ConcurrentHashMap<Object, AtomicReference<ConflationQueueEntry>>();
 
-    private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
-    private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+    private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
+    private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
 
-    public ConflationQueueList(AMQQueue queue, String conflationKey)
+    public ConflationQueueList(ConflationQueue queue, String conflationKey)
     {
-        super(queue);
+        super(queue, HEAD_CREATOR);
         _conflationKey = conflationKey;
     }
 
+    private ConflationQueueEntry createHead()
+    {
+        return new ConflationQueueEntry(this);
+    }
+
     public String getConflationKey()
     {
         return _conflationKey;
@@ -66,7 +81,7 @@ public class ConflationQueueList extends
     @Override
     public ConflationQueueEntry add(final ServerMessage message)
     {
-        final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
+        final ConflationQueueEntry addedEntry = super.add(message);
 
         final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
         if (keyValue != null)
@@ -76,14 +91,14 @@ public class ConflationQueueList extends
                 LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
             }
 
-            final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
-            AtomicReference<QueueEntry> entryReferenceFromMap = null;
-            QueueEntry entryFromMap;
+            final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry);
+            AtomicReference<ConflationQueueEntry> entryReferenceFromMap;
+            ConflationQueueEntry entryFromMap;
 
             // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
             // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
             // indicating that the reference object is no longer valid (it is being removed from the map).
-            boolean keepTryingToUpdateEntryReference = true;
+            boolean keepTryingToUpdateEntryReference;
             do
             {
                 do
@@ -139,16 +154,16 @@ public class ConflationQueueList extends
      * adds and removes during execution of this method.</li>
      * </ul>
      */
-    private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
+    private AtomicReference<ConflationQueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<ConflationQueueEntry> referenceToAddedValue)
     {
-        AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+        AtomicReference<ConflationQueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
 
         if(latestValueReference == null)
         {
             latestValueReference = _latestValuesMap.get(key);
             if(latestValueReference == null)
             {
-                return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+                return new AtomicReference<ConflationQueueEntry>(_newerEntryAlreadyBeenAndGone);
             }
         }
         return latestValueReference;
@@ -177,12 +192,17 @@ public class ConflationQueueList extends
         }
     }
 
-    private final class ConflationQueueEntry extends SimpleQueueEntryImpl
+    final class ConflationQueueEntry extends OrderedQueueEntry<ConflationQueueEntry, ConflationQueue, ConflationQueueList>
     {
 
-        private AtomicReference<QueueEntry> _latestValueReference;
+        private AtomicReference<ConflationQueueEntry> _latestValueReference;
+
+        private ConflationQueueEntry(final ConflationQueueList queueEntryList)
+        {
+            super(queueEntryList);
+        }
 
-        public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
+        public ConflationQueueEntry(ConflationQueueList queueEntryList, ServerMessage message)
         {
             super(queueEntryList, message);
         }
@@ -206,7 +226,7 @@ public class ConflationQueueList extends
 
         }
 
-        public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
+        public void setLatestValueReference(final AtomicReference<ConflationQueueEntry> latestValueReference)
         {
             _latestValueReference = latestValueReference;
         }
@@ -227,12 +247,12 @@ public class ConflationQueueList extends
     /**
      * Exposed purposes of unit test only.
      */
-    Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+    Map<Object, AtomicReference<ConflationQueueEntry>> getLatestValuesMap()
     {
         return Collections.unmodifiableMap(_latestValuesMap);
     }
 
-    static class Factory implements QueueEntryListFactory
+    static class Factory implements QueueEntryListFactory<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>
     {
         private final String _conflationKey;
 
@@ -241,7 +261,8 @@ public class ConflationQueueList extends
             _conflationKey = conflationKey;
         }
 
-        public ConflationQueueList createQueueEntryList(AMQQueue queue)
+        @Override
+        public ConflationQueueList createQueueEntryList(final ConflationQueue queue)
         {
             return new ConflationQueueList(queue, _conflationKey);
         }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Wed Feb 12 13:27:51 2014
@@ -20,37 +20,35 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
 import java.util.UUID;
 
-public abstract class OutOfOrderQueue extends SimpleAMQQueue
+public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends SimpleAMQQueue<E,Q,L>
 {
 
     protected OutOfOrderQueue(UUID id, String name, boolean durable,
                               String owner, boolean autoDelete, boolean exclusive,
-                              VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+                              VirtualHost virtualHost, QueueEntryListFactory<E,Q,L> entryListFactory, Map<String, Object> arguments)
     {
         super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
     }
 
     @Override
-    protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+    protected void checkConsumersNotAheadOfDelivery(final E entry)
     {
-        // check that all subscriptions are not in advance of the entry
-        SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
+        // check that all consumers are not in advance of the entry
+        QueueConsumerList.ConsumerNodeIterator<E,Q,L> subIter = getConsumerList().iterator();
         while(subIter.advance() && !entry.isAcquired())
         {
-            final Subscription subscription = subIter.getNode().getSubscription();
-            if(!subscription.isClosed())
+            final QueueConsumer<?,E,Q,L> consumer = subIter.getNode().getConsumer();
+            if(!consumer.isClosed())
             {
-                QueueContext context = (QueueContext) subscription.getQueueContext();
+                QueueContext<E,Q,L> context = consumer.getQueueContext();
                 if(context != null)
                 {
-                    QueueEntry released = context.getReleasedEntry();
+                    E released = context.getReleasedEntry();
                     while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
                     {
                         if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Wed Feb 12 13:27:51 2014
@@ -22,132 +22,162 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.message.ServerMessage;
 
-public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
+abstract public class PriorityQueueList extends OrderedQueueEntryList<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
 {
-    private final AMQQueue _queue;
-    private final PriorityQueueEntrySubList[] _priorityLists;
-    private final int _priorities;
-    private final int _priorityOffset;
 
-    public PriorityQueueList(AMQQueue queue, int priorities)
-    {
-        _queue = queue;
-        _priorityLists = new PriorityQueueEntrySubList[priorities];
-        _priorities = priorities;
-        _priorityOffset = 5-((priorities + 1)/2);
-        for(int i = 0; i < priorities; i++)
-        {
-            _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
-        }
-    }
 
-    public int getPriorities()
+    public PriorityQueueList(final PriorityQueue queue,
+                             final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> headCreator)
     {
-        return _priorities;
+        super(queue, headCreator);
     }
 
-    public AMQQueue getQueue()
+    static class PriorityQueueMasterList extends PriorityQueueList
     {
-        return _queue;
-    }
+        private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> DUMMY_HEAD_CREATOR =
+                new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+                {
+                    @Override
+                    public PriorityQueueEntry createHead(final PriorityQueueList list)
+                    {
+                        return null;
+                    }
+                };
+        private final PriorityQueue _queue;
+        private final PriorityQueueEntrySubList[] _priorityLists;
+        private final int _priorities;
+        private final int _priorityOffset;
 
-    public SimpleQueueEntryImpl add(ServerMessage message)
-    {
-        int index = message.getMessageHeader().getPriority() - _priorityOffset;
-        if(index >= _priorities)
+        public PriorityQueueMasterList(PriorityQueue queue, int priorities)
         {
-            index = _priorities-1;
+            super(queue, DUMMY_HEAD_CREATOR);
+            _queue = queue;
+            _priorityLists = new PriorityQueueEntrySubList[priorities];
+            _priorities = priorities;
+            _priorityOffset = 5-((priorities + 1)/2);
+            for(int i = 0; i < priorities; i++)
+            {
+                _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
+            }
         }
-        else if(index < 0)
+
+        public int getPriorities()
         {
-            index = 0;
+            return _priorities;
         }
-        return _priorityLists[index].add(message);
-
-    }
-
-    public SimpleQueueEntryImpl next(SimpleQueueEntryImpl node)
-    {
-        SimpleQueueEntryImpl next = node.getNextValidEntry();
 
-        if(next == null)
+        public PriorityQueue getQueue()
         {
-            final QueueEntryList<?> nodeEntryList = node.getQueueEntryList();
-            int index;
-            for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--) {};
+            return _queue;
+        }
 
-            while(next == null && index != 0)
+        public PriorityQueueEntry add(ServerMessage message)
+        {
+            int index = message.getMessageHeader().getPriority() - _priorityOffset;
+            if(index >= _priorities)
+            {
+                index = _priorities-1;
+            }
+            else if(index < 0)
             {
-                index--;
-                next = _priorityLists[index].getHead().getNextValidEntry();
+                index = 0;
             }
+            return _priorityLists[index].add(message);
 
         }
-        return next;
-    }
 
-    private final class PriorityQueueEntryListIterator implements QueueEntryIterator<SimpleQueueEntryImpl>
-    {
-        private final SimpleQueueEntryList.QueueEntryIteratorImpl[] _iterators = new SimpleQueueEntryList.QueueEntryIteratorImpl[ _priorityLists.length ];
-        private SimpleQueueEntryImpl _lastNode;
+        @Override
+        protected PriorityQueueEntry createQueueEntry(final ServerMessage<?> message)
+        {
+            throw new UnsupportedOperationException();
+        }
 
-        PriorityQueueEntryListIterator()
+        public PriorityQueueEntry next(PriorityQueueEntry node)
         {
-            for(int i = 0; i < _priorityLists.length; i++)
+            PriorityQueueEntry next = node.getNextValidEntry();
+
+            if(next == null)
             {
-                _iterators[i] = _priorityLists[i].iterator();
+                final PriorityQueueList nodeEntryList = node.getQueueEntryList();
+                int index;
+                for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--)
+                {
+                    // do nothing loop is just to find the index
+                }
+
+                while(next == null && index != 0)
+                {
+                    index--;
+                    next = _priorityLists[index].getHead().getNextValidEntry();
+                }
+
             }
-            _lastNode = _iterators[_iterators.length - 1].getNode();
+            return next;
         }
 
-
-        public boolean atTail()
+        private final class PriorityQueueEntryListIterator implements QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList, QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>
         {
-            for(int i = 0; i < _iterators.length; i++)
+            private final QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList,QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+            private PriorityQueueEntry _lastNode;
+
+            PriorityQueueEntryListIterator()
             {
-                if(!_iterators[i].atTail())
+                for(int i = 0; i < _priorityLists.length; i++)
                 {
-                    return false;
+                    _iterators[i] = _priorityLists[i].iterator();
                 }
+                _lastNode = _iterators[_iterators.length - 1].getNode();
             }
-            return true;
-        }
 
-        public SimpleQueueEntryImpl getNode()
-        {
-            return _lastNode;
-        }
 
-        public boolean advance()
-        {
-            for(int i = _iterators.length-1; i >= 0; i--)
+            public boolean atTail()
             {
-                if(_iterators[i].advance())
+                for(int i = 0; i < _iterators.length; i++)
                 {
-                    _lastNode = _iterators[i].getNode();
-                    return true;
+                    if(!_iterators[i].atTail())
+                    {
+                        return false;
+                    }
                 }
+                return true;
+            }
+
+            public PriorityQueueEntry getNode()
+            {
+                return _lastNode;
+            }
+
+            public boolean advance()
+            {
+                for(int i = _iterators.length-1; i >= 0; i--)
+                {
+                    if(_iterators[i].advance())
+                    {
+                        _lastNode = _iterators[i].getNode();
+                        return true;
+                    }
+                }
+                return false;
             }
-            return false;
         }
-    }
 
-    public PriorityQueueEntryListIterator iterator()
-    {
-        return new PriorityQueueEntryListIterator();
-    }
+        public PriorityQueueEntryListIterator iterator()
+        {
 
-    public SimpleQueueEntryImpl getHead()
-    {
-        return _priorityLists[_priorities-1].getHead();
-    }
+            return new PriorityQueueEntryListIterator();
+        }
 
-    public void entryDeleted(final SimpleQueueEntryImpl queueEntry)
-    {
+        public PriorityQueueEntry getHead()
+        {
+            return _priorityLists[_priorities-1].getHead();
+        }
 
-    }
+        public void entryDeleted(final PriorityQueueEntry queueEntry)
+        {
 
-    static class Factory implements QueueEntryListFactory
+        }
+    }
+    static class Factory implements QueueEntryListFactory<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
     {
         private final int _priorities;
 
@@ -156,26 +186,34 @@ public class PriorityQueueList implement
             _priorities = priorities;
         }
 
-        public PriorityQueueList createQueueEntryList(AMQQueue queue)
+        public PriorityQueueList createQueueEntryList(PriorityQueue queue)
         {
-            return new PriorityQueueList(queue, _priorities);
+            return new PriorityQueueMasterList(queue, _priorities);
         }
     }
 
-    private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
+    static class PriorityQueueEntrySubList extends PriorityQueueList
     {
+        private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> HEAD_CREATOR = new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+        {
+            @Override
+            public PriorityQueueEntry createHead(final PriorityQueueList list)
+            {
+                return new PriorityQueueEntry(list);
+            }
+        };
         private int _listPriority;
 
-        public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
+        public PriorityQueueEntrySubList(PriorityQueue queue, int listPriority)
         {
-            super(queue);
+            super(queue, HEAD_CREATOR);
             _listPriority = listPriority;
         }
 
         @Override
-        protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message)
+        protected PriorityQueueEntry createQueueEntry(ServerMessage<?> message)
         {
-            return new PriorityQueueEntryImpl(this, message);
+            return new PriorityQueueEntry(this, message);
         }
 
         public int getListPriority()
@@ -184,17 +222,22 @@ public class PriorityQueueList implement
         }
     }
 
-    private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
+    static class PriorityQueueEntry extends OrderedQueueEntry<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
     {
-        public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
+        private PriorityQueueEntry(final PriorityQueueList queueEntryList)
+        {
+            super(queueEntryList);
+        }
+
+        public PriorityQueueEntry(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
         {
             super(queueEntryList, message);
         }
 
         @Override
-        public int compareTo(final QueueEntry o)
+        public int compareTo(final PriorityQueueEntry o)
         {
-            PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)((PriorityQueueEntryImpl)o).getQueueEntryList();
+            PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)o.getQueueEntryList();
             int otherPriority = pqel.getListPriority();
             int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority();
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Wed Feb 12 13:27:51 2014
@@ -23,32 +23,32 @@ package org.apache.qpid.server.queue;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-final class QueueContext implements AMQQueue.Context
+final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
 {
-    private volatile QueueEntry _lastSeenEntry;
-    private volatile QueueEntry _releasedEntry;
+    private volatile E _lastSeenEntry;
+    private volatile E _releasedEntry;
 
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
             _lastSeenUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
+        (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry");
+    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
             _releasedUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_releasedEntry");
+        (QueueContext.class, QueueEntryImpl.class, "_releasedEntry");
 
-    public QueueContext(QueueEntry head)
+    public QueueContext(E head)
     {
         _lastSeenEntry = head;
     }
 
-    public QueueEntry getLastSeenEntry()
+    public E getLastSeenEntry()
     {
         return _lastSeenEntry;
     }
 
 
-    QueueEntry getReleasedEntry()
+    E getReleasedEntry()
     {
         return _releasedEntry;
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Wed Feb 12 13:27:51 2014
@@ -20,207 +20,21 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
 
-public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
+public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E>
 {
 
-
-
-    public static enum State
-    {
-        AVAILABLE,
-        ACQUIRED,
-        EXPIRED,
-        DEQUEUED,
-        DELETED;
-
-
-    }
-
-    public static interface StateChangeListener
-    {
-        public void stateChanged(QueueEntry entry, State oldSate, State newState);
-    }
-
-    public abstract class EntryState
-    {
-        private EntryState()
-        {
-        }
-
-        public abstract State getState();
-
-        /**
-         * Returns true if state is either DEQUEUED or DELETED.
-         *
-         * @return true if state is either DEQUEUED or DELETED.
-         */
-        public boolean isDispensed()
-        {
-            State currentState = getState();
-            return currentState == State.DEQUEUED || currentState == State.DELETED;
-        }
-    }
-
-
-    public final class AvailableState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.AVAILABLE;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-
-    public final class DequeuedState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.DEQUEUED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-
-    public final class DeletedState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.DELETED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-    public final class ExpiredState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.EXPIRED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-
-    public final class NonSubscriptionAcquiredState extends EntryState
-    {
-        public State getState()
-        {
-            return State.ACQUIRED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-    public final class SubscriptionAcquiredState extends EntryState
-    {
-        private final Subscription _subscription;
-
-        public SubscriptionAcquiredState(Subscription subscription)
-        {
-            _subscription = subscription;
-        }
-
-
-        public State getState()
-        {
-            return State.ACQUIRED;
-        }
-
-        public Subscription getSubscription()
-        {
-            return _subscription;
-        }
-
-        public String toString()
-        {
-            return "{" + getState().name() + " : " + _subscription +"}";
-        }
-    }
-
-
-    final static EntryState AVAILABLE_STATE = new AvailableState();
-    final static EntryState DELETED_STATE = new DeletedState();
-    final static EntryState DEQUEUED_STATE = new DequeuedState();
-    final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
-
-    AMQQueue getQueue();
+    Q getQueue();
 
     long getSize();
 
-    boolean getDeliveredToConsumer();
-
-    boolean expired() throws AMQException;
-
-    boolean acquire(Subscription sub);
-
-    boolean acquiredBySubscription();
-    boolean isAcquiredBy(Subscription subscription);
-
-    void setRedelivered();
-
-    boolean isRedelivered();
-
-    Subscription getDeliveredSubscription();
-
-    void reject();
-
-    boolean isRejectedBy(long subscriptionId);
-
-    int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
-
     boolean isQueueDeleted();
 
-    QueueEntry getNextNode();
-
-    QueueEntry getNextValidEntry();
-
-    void addStateChangeListener(StateChangeListener listener);
-    boolean removeStateChangeListener(StateChangeListener listener);
-
-
-    /**
-     * Number of times this queue entry has been delivered.
-     *
-     * @return delivery count
-     */
-    int getDeliveryCount();
-
-    void incrementDeliveryCount();
+    E getNextNode();
 
-    void decrementDeliveryCount();
+    E getNextValidEntry();
 
-    Filterable asFilterable();
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Feb 12 13:27:51 2014
@@ -26,11 +26,15 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
 
 import java.util.EnumMap;
 import java.util.HashSet;
@@ -40,13 +44,13 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-public abstract class QueueEntryImpl implements QueueEntry
+public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>>
 {
     private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
 
-    private final QueueEntryList _queueEntryList;
+    private final L _queueEntryList;
 
-    private MessageReference _message;
+    private final MessageReference _message;
 
     private Set<Long> _rejectedBy = null;
 
@@ -59,7 +63,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile Set<StateChangeListener> _stateChangeListeners;
+    private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -86,14 +90,14 @@ public abstract class QueueEntryImpl imp
     private boolean _deliveredToConsumer;
 
 
-    public QueueEntryImpl(QueueEntryList<?> queueEntryList)
+    public QueueEntryImpl(L queueEntryList)
     {
         this(queueEntryList,null,Long.MIN_VALUE);
         _state = DELETED_STATE;
     }
 
 
-    public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message, final long entryId)
+    public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId)
     {
         _queueEntryList = queueEntryList;
 
@@ -103,7 +107,7 @@ public abstract class QueueEntryImpl imp
         populateInstanceProperties();
     }
 
-    public QueueEntryImpl(QueueEntryList<?> queueEntryList, ServerMessage message)
+    public QueueEntryImpl(L queueEntryList, ServerMessage message)
     {
         _queueEntryList = queueEntryList;
         _message = message == null ? null :  message.newReference();
@@ -134,7 +138,7 @@ public abstract class QueueEntryImpl imp
         return _entryId;
     }
 
-    public AMQQueue getQueue()
+    public Q getQueue()
     {
         return _queueEntryList.getQueue();
     }
@@ -183,7 +187,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean acquire()
     {
-        return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+        return acquire(NON_CONSUMER_ACQUIRED_STATE);
     }
 
     private boolean acquire(final EntryState state)
@@ -198,7 +202,7 @@ public abstract class QueueEntryImpl imp
         return acquired;
     }
 
-    public boolean acquire(Subscription sub)
+    public boolean acquire(QueueConsumer<?,E,Q,L> sub)
     {
         final boolean acquired = acquire(sub.getOwningState());
         if(acquired)
@@ -208,17 +212,17 @@ public abstract class QueueEntryImpl imp
         return acquired;
     }
 
-    public boolean acquiredBySubscription()
+    public boolean acquiredByConsumer()
     {
 
-        return (_state instanceof SubscriptionAcquiredState);
+        return (_state instanceof ConsumerAcquiredState);
     }
 
-    public boolean isAcquiredBy(Subscription subscription)
+    public boolean isAcquiredBy(QueueConsumer consumer)
     {
         EntryState state = _state;
-        return state instanceof SubscriptionAcquiredState
-               && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
+        return state instanceof ConsumerAcquiredState
+               && ((ConsumerAcquiredState)state).getConsumer() == consumer;
     }
 
     public void release()
@@ -228,19 +232,14 @@ public abstract class QueueEntryImpl imp
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
 
-            if(state instanceof SubscriptionAcquiredState)
+            if(state instanceof ConsumerAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount(this);
-                Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
-                if (subscription != null)
-                {
-                    subscription.releaseQueueEntry(this);
-                }
+                getQueue().decrementUnackedMsgCount((E) this);
             }
 
             if(!getQueue().isDeleted())
             {
-                getQueue().requeue(this);
+                getQueue().requeue((E)this);
                 if(_stateChangeListeners != null)
                 {
                     notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
@@ -265,12 +264,12 @@ public abstract class QueueEntryImpl imp
         return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
     }
 
-    public Subscription getDeliveredSubscription()
+    public QueueConsumer getDeliveredConsumer()
     {
         EntryState state = _state;
-        if (state instanceof SubscriptionAcquiredState)
+        if (state instanceof ConsumerAcquiredState)
         {
-            return ((SubscriptionAcquiredState) state).getSubscription();
+            return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
         }
         else
         {
@@ -280,16 +279,16 @@ public abstract class QueueEntryImpl imp
 
     public void reject()
     {
-        Subscription subscription = getDeliveredSubscription();
+        QueueConsumer consumer = getDeliveredConsumer();
 
-        if (subscription != null)
+        if (consumer != null)
         {
             if (_rejectedBy == null)
             {
                 _rejectedBy = new HashSet<Long>();
             }
 
-            _rejectedBy.add(subscription.getSubscriptionID());
+            _rejectedBy.add(consumer.getId());
         }
         else
         {
@@ -297,12 +296,12 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    public boolean isRejectedBy(long subscriptionId)
+    public boolean isRejectedBy(QueueConsumer consumer)
     {
 
-        if (_rejectedBy != null) // We have subscriptions that rejected this message
+        if (_rejectedBy != null) // We have consumers that rejected this message
         {
-            return _rejectedBy.contains(subscriptionId);
+            return _rejectedBy.contains(consumer.getId());
         }
         else // This message hasn't been rejected yet.
         {
@@ -316,15 +315,12 @@ public abstract class QueueEntryImpl imp
 
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
         {
-            Subscription s = null;
-            if (state instanceof SubscriptionAcquiredState)
+            if (state instanceof ConsumerAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount(this);
-                s = ((SubscriptionAcquiredState) state).getSubscription();
-                s.onDequeue(this);
+                getQueue().decrementUnackedMsgCount((E) this);
             }
 
-            getQueue().dequeue(this,s);
+            getQueue().dequeue((E)this);
             if(_stateChangeListeners != null)
             {
                 notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -336,9 +332,9 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener l : _stateChangeListeners)
+        for(StateChangeListener<? super E, State> l : _stateChangeListeners)
         {
-            l.stateChanged(this, oldState, newState);
+            l.stateChanged((E)this, oldState, newState);
         }
     }
 
@@ -348,7 +344,7 @@ public abstract class QueueEntryImpl imp
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
-            _queueEntryList.entryDeleted(this);
+            _queueEntryList.entryDeleted((E)this);
             onDelete();
             _message.release();
 
@@ -367,44 +363,49 @@ public abstract class QueueEntryImpl imp
         dispose();
     }
 
-    public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
+    public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn)
     {
         final AMQQueue currentQueue = getQueue();
         Exchange alternateExchange = currentQueue.getAlternateExchange();
         boolean autocommit =  txn == null;
+        int enqueues;
+
+        if(autocommit)
+        {
+            txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
+        }
+
         if (alternateExchange != null)
         {
-            if(autocommit)
+            enqueues = alternateExchange.send(getMessage(),
+                                              getInstanceProperties(),
+                                              txn,
+                                              action);
+        }
+        else
+        {
+            enqueues = 0;
+        }
+
+        txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
+        {
+            public void postCommit()
             {
-                txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
+                delete();
             }
 
-            int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
-
-            txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
+            public void onRollback()
             {
-                public void postCommit()
-                {
-                    delete();
-                }
-
-                public void onRollback()
-                {
-
-                }
-            });
 
-            if(autocommit)
-            {
-                txn.commit();
             }
-            return enqueues;
+        });
 
-        }
-        else
+        if(autocommit)
         {
-            return 0;
+            txn.commit();
         }
+
+        return enqueues;
     }
 
     public boolean isQueueDeleted()
@@ -412,21 +413,21 @@ public abstract class QueueEntryImpl imp
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener listener)
+    public void addStateChangeListener(StateChangeListener<? super E,State> listener)
     {
-        Set<StateChangeListener> listeners = _stateChangeListeners;
+        Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
         if(listeners == null)
         {
-            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>());
             listeners = _stateChangeListeners;
         }
 
         listeners.add(listener);
     }
 
-    public boolean removeStateChangeListener(StateChangeListener listener)
+    public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener)
     {
-        Set<StateChangeListener> listeners = _stateChangeListeners;
+        Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
         if(listeners != null)
         {
             return listeners.remove(listener);
@@ -436,9 +437,9 @@ public abstract class QueueEntryImpl imp
     }
 
 
-    public int compareTo(final QueueEntry o)
+    public int compareTo(final E o)
     {
-        QueueEntryImpl other = (QueueEntryImpl)o;
+        E other = o;
         return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
     }
 
@@ -446,7 +447,7 @@ public abstract class QueueEntryImpl imp
     {
     }
 
-    public QueueEntryList getQueueEntryList()
+    public L getQueueEntryList()
     {
         return _queueEntryList;
     }
@@ -461,6 +462,12 @@ public abstract class QueueEntryImpl imp
         return _deliveryCount;
     }
 
+    @Override
+    public int getMaximumDeliveryCount()
+    {
+        return getQueue().getMaximumDeliveryCount();
+    }
+
     public void incrementDeliveryCount()
     {
         _deliveryCountUpdater.incrementAndGet(this);
@@ -485,6 +492,23 @@ public abstract class QueueEntryImpl imp
                 '}';
     }
 
+    @Override
+    public boolean resend() throws AMQException
+    {
+        QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer();
+        if(sub != null)
+        {
+            return sub.resend((E)this);
+        }
+        return false;
+    }
+
+    @Override
+    public TransactionLogResource getOwningResource()
+    {
+        return getQueue();
+    }
+
     private static class EntryInstanceProperties implements InstanceProperties
     {
         private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Wed Feb 12 13:27:51 2014
@@ -20,11 +20,13 @@
 */
 package org.apache.qpid.server.queue;
 
-public interface QueueEntryIterator<QE extends QueueEntry>
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
 {
     boolean atTail();
 
-    QE getNode();
+    E getNode();
 
     boolean advance();
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Wed Feb 12 13:27:51 2014
@@ -20,21 +20,23 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.ServerMessage;
 
-public interface QueueEntryList<Q extends QueueEntry>
+public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
 {
-    AMQQueue getQueue();
+    Q getQueue();
 
-    Q add(ServerMessage message);
+    E add(ServerMessage message);
 
-    Q next(Q node);
+    E next(E node);
 
-    QueueEntryIterator<Q> iterator();
+    QueueEntryIterator<E,Q,L,C> iterator();
 
-    Q getHead();
+    E getHead();
 
-    void entryDeleted(Q queueEntry);
+    void entryDeleted(E queueEntry);
     
     int getPriorities();
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Wed Feb 12 13:27:51 2014
@@ -20,7 +20,7 @@
 */
 package org.apache.qpid.server.queue;
 
-interface QueueEntryListFactory
+interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>>
 {
-    public QueueEntryList createQueueEntryList(AMQQueue queue);
+    public L createQueueEntryList(Q queue);
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Wed Feb 12 13:27:51 2014
@@ -16,7 +16,9 @@
 */
 package org.apache.qpid.server.queue;
 
-public interface QueueEntryVisitor
+import org.apache.qpid.server.consumer.Consumer;
+
+public interface QueueEntryVisitor<E extends QueueEntry>
 {
-    boolean visit(QueueEntry entry);
+    boolean visit(E entry);
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Wed Feb 12 13:27:51 2014
@@ -32,8 +32,8 @@ import org.apache.qpid.transport.Transpo
 
 /**
  * QueueRunners are Runnables used to process a queue when requiring
- * asynchronous message delivery to subscriptions, which is necessary
- * when straight-through delivery of a message to a subscription isn't
+ * asynchronous message delivery to consumers, which is necessary
+ * when straight-through delivery of a message to a consumer isn't
  * possible during the enqueue operation.
  */
 public class QueueRunner implements Runnable



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org