You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC

svn commit: r1567616 [9/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/ qpid/cpp/bindings/qpid/ruby/ qp...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Feb 12 13:27:51 2014
@@ -21,19 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,10 @@ import org.apache.qpid.server.Transactio
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.SimpleFilterManager;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.logging.LogActor;
@@ -66,25 +59,28 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
 
@@ -122,7 +118,7 @@ public class AMQChannel implements AMQSe
     private IncomingMessage _currentMessage;
 
     /** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
-    private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+    private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
 
     private final MessageStore _messageStore;
 
@@ -155,7 +151,7 @@ public class AMQChannel implements AMQSe
     private volatile boolean _rollingBack;
 
     private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
-    private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+    private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
     private static final
     AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
     private long _createTime = System.currentTimeMillis();
@@ -266,7 +262,7 @@ public class AMQChannel implements AMQSe
         return _channelId;
     }
 
-    public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+    public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
     {
         String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
         SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -275,7 +271,7 @@ public class AMQChannel implements AMQSe
             throw new AMQSecurityException("Permission denied: " + e.getName());
         }
         _currentMessage = new IncomingMessage(info);
-        _currentMessage.setExchange(e);
+        _currentMessage.setMessageDestination(e);
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -360,7 +356,7 @@ public class AMQChannel implements AMQSe
                                     }
                                 };
 
-                        int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+                        int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
                                                                           immediate ? _immediateAction : _capacityCheckAction);
                         if(enqueues == 0)
                         {
@@ -497,41 +493,64 @@ public class AMQChannel implements AMQSe
     }
 
 
-    public Subscription getSubscription(AMQShortString subscription)
+    public Consumer getSubscription(AMQShortString tag)
     {
-        return _tag2SubscriptionMap.get(subscription);
+        final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+        return target == null ? null : target.getConsumer();
     }
 
     /**
      * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
      * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
      *
+     *
      * @param tag       the tag chosen by the client (if null, server will generate one)
-     * @param queue     the queue to subscribe to
+     * @param source     the queue to subscribe to
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
      *
-     * @param noLocal   Flag stopping own messages being received.
      * @param exclusive Flag requesting exclusive access to the queue
      * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
      *
      * @throws AMQException                  if something goes wrong
      */
-    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
-                                           FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+    public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+                                            FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException
     {
         if (tag == null)
         {
             tag = new AMQShortString("sgen_" + getNextConsumerTag());
         }
 
-        if (_tag2SubscriptionMap.containsKey(tag))
+        if (_tag2SubscriptionTargetMap.containsKey(tag))
         {
             throw new AMQException("Consumer already exists with same tag: " + tag);
         }
 
-         Subscription subscription =
-                SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+        ConsumerTarget_0_8 target;
+        EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+
+        if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+        {
+            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+        }
+        else if(acks)
+        {
+            target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
+        }
+        else
+        {
+            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+            options.add(Consumer.Option.ACQUIRES);
+            options.add(Consumer.Option.SEES_REQUEUES);
+        }
+
+        if(exclusive)
+        {
+            options.add(Consumer.Option.EXCLUSIVE);
+        }
 
 
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
@@ -539,20 +558,34 @@ public class AMQChannel implements AMQSe
         // so calling _cT2QM.remove before we have done put which was after the register succeeded.
         // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
 
-        _tag2SubscriptionMap.put(tag, subscription);
+        _tag2SubscriptionTargetMap.put(tag, target);
 
         try
         {
-            queue.registerSubscription(subscription, exclusive);
+            FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+            if(noLocal)
+            {
+                if(filterManager == null)
+                {
+                    filterManager = new SimpleFilterManager();
+                }
+                filterManager.add(new FilterSupport.NoLocalFilter(source));
+            }
+            Consumer sub =
+                    source.addConsumer(target,
+                                       filterManager,
+                                      AMQMessage.class,
+                                      AMQShortString.toString(tag),
+                                      options);
         }
         catch (AMQException e)
         {
-            _tag2SubscriptionMap.remove(tag);
+            _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
         catch (RuntimeException e)
         {
-            _tag2SubscriptionMap.remove(tag);
+            _tag2SubscriptionTargetMap.remove(tag);
             throw e;
         }
         return tag;
@@ -567,18 +600,11 @@ public class AMQChannel implements AMQSe
     public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
     {
 
-        Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+        ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+        Consumer sub = target == null ? null : target.getConsumer();
         if (sub != null)
         {
-            try
-            {
-                sub.getSendLock();
-                sub.getQueue().unregisterSubscription(sub);
-            }
-            finally
-            {
-                sub.releaseSendLock();
-            }
+            sub.close();
             return true;
         }
         else
@@ -633,7 +659,7 @@ public class AMQChannel implements AMQSe
     {
         if (_logger.isInfoEnabled())
         {
-            if (!_tag2SubscriptionMap.isEmpty())
+            if (!_tag2SubscriptionTargetMap.isEmpty())
             {
                 _logger.info("Unsubscribing all consumers on channel " + toString());
             }
@@ -643,28 +669,21 @@ public class AMQChannel implements AMQSe
             }
         }
 
-        for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+        for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
         {
             if (_logger.isInfoEnabled())
             {
                 _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
-            Subscription sub = me.getValue();
+            Consumer sub = me.getValue().getConsumer();
 
-            try
-            {
-                sub.getSendLock();
-                sub.getQueue().unregisterSubscription(sub);
-            }
-            finally
-            {
-                sub.releaseSendLock();
-            }
+
+            sub.close();
 
         }
 
-        _tag2SubscriptionMap.clear();
+        _tag2SubscriptionTargetMap.clear();
     }
 
     /**
@@ -673,24 +692,15 @@ public class AMQChannel implements AMQSe
      * @param entry       the record of the message on the queue that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
-     * @param subscription The consumer that is to acknowledge this message.
+     * @param consumer The consumer that is to acknowledge this message.
      */
-    public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
+    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
     {
         if (_logger.isDebugEnabled())
         {
-            if (entry.getQueue() == null)
-            {
-                _logger.debug("Adding unacked message with a null queue:" + entry);
-            }
-            else
-            {
-                if (_logger.isDebugEnabled())
-                {
                     _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
-                               + ") with a queue(" + entry.getQueue() + ") for " + subscription);
-                }
-            }
+                               + ") for " + consumer + " on " + entry.getOwningResource().getName());
+
         }
 
         _unacknowledgedMessageMap.add(deliveryTag, entry);
@@ -713,7 +723,7 @@ public class AMQChannel implements AMQSe
     public void requeue() throws AMQException
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
-        Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+        Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
         if (!messagesToBeDelivered.isEmpty())
         {
@@ -724,21 +734,13 @@ public class AMQChannel implements AMQSe
 
         }
 
-        for (QueueEntry unacked : messagesToBeDelivered)
+        for (MessageInstance unacked : messagesToBeDelivered)
         {
-            if (!unacked.isQueueDeleted())
-            {
-                // Mark message redelivered
-                unacked.setRedelivered();
-
-                // Ensure message is released for redelivery
-                unacked.release();
+            // Mark message redelivered
+            unacked.setRedelivered();
 
-            }
-            else
-            {
-                unacked.delete();
-            }
+            // Ensure message is released for redelivery
+            unacked.release();
         }
 
     }
@@ -752,7 +754,7 @@ public class AMQChannel implements AMQSe
      */
     public void requeue(long deliveryTag) throws AMQException
     {
-        QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+        MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
         if (unacked != null)
         {
@@ -760,20 +762,8 @@ public class AMQChannel implements AMQSe
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            if (!unacked.isQueueDeleted())
-            {
+            unacked.release();
 
-                // Ensure message is released for redelivery
-                unacked.release();
-
-            }
-            else
-            {
-                _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
-                          + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
-
-                unacked.delete();
-            }
         }
         else
         {
@@ -786,10 +776,10 @@ public class AMQChannel implements AMQSe
 
     public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
     {
-        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
         if (queueEntry != null)
         {
-            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
             return maximumDeliveryCount > 0;
         }
 
@@ -798,10 +788,10 @@ public class AMQChannel implements AMQSe
 
     public boolean isDeliveredTooManyTimes(final long deliveryTag)
     {
-        final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+        final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
         if (queueEntry != null)
         {
-            final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+            final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
             final int numDeliveries = queueEntry.getDeliveryCount();
             return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
         }
@@ -812,16 +802,14 @@ public class AMQChannel implements AMQSe
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
-     * @param requeue Are the messages to be requeued or dropped.
-     *
      * @throws AMQException When something goes wrong.
      */
-    public void resend(final boolean requeue) throws AMQException
+    public void resend() throws AMQException
     {
 
 
-        final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
-        final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+        final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+        final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
 
         if (_logger.isDebugEnabled())
         {
@@ -833,9 +821,8 @@ public class AMQChannel implements AMQSe
         // and those that don't to be requeued.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
                                                                     msgToRequeue,
-                                                                    msgToResend,
-                                                                    requeue,
-                                                                    _messageStore));
+                                                                    msgToResend
+        ));
 
 
         // Process Messages to Resend
@@ -851,39 +838,20 @@ public class AMQChannel implements AMQSe
             }
         }
 
-        for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
+        for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
         {
-            QueueEntry message = entry.getValue();
+            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
             message.decrementDeliveryCount();
 
-            AMQQueue queue = message.getQueue();
-
             // Without any details from the client about what has been processed we have to mark
             // all messages in the unacked map as redelivered.
             message.setRedelivered();
 
-            Subscription sub = message.getDeliveredSubscription();
-
-            if (sub != null)
-            {
-
-                if(!queue.resend(message,sub))
-                {
-                    msgToRequeue.put(deliveryTag, message);
-                }
-            }
-            else
+            if (!message.resend())
             {
-
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
-                              + ")to prevent loss");
-                }
-                // move this message to requeue
                 msgToRequeue.put(deliveryTag, message);
             }
         } // for all messages
@@ -898,9 +866,9 @@ public class AMQChannel implements AMQSe
         }
 
         // Process Messages to Requeue at the front of the queue
-        for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
+        for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
         {
-            QueueEntry message = entry.getValue();
+            MessageInstance message = entry.getValue();
             long deliveryTag = entry.getKey();
 
             //Amend the delivery counter as the client hasn't seen these messages yet.
@@ -926,11 +894,11 @@ public class AMQChannel implements AMQSe
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
-        Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+        Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
     }
 
-    private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+    private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple)
     {
 
         return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
@@ -976,9 +944,9 @@ public class AMQChannel implements AMQSe
             if (wasSuspended)
             {
                 // may need to deliver queued messages
-                for (Subscription s : _tag2SubscriptionMap.values())
+                for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
-                    s.getQueue().deliverAsync(s);
+                    s.getConsumer().externalStateChange();
                 }
             }
 
@@ -992,15 +960,15 @@ public class AMQChannel implements AMQSe
             if (!wasSuspended)
             {
                 // may need to deliver queued messages
-                for (Subscription s : _tag2SubscriptionMap.values())
+                for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
                 {
                     try
                     {
-                        s.getSendLock();
+                        s.getConsumer().getSendLock();
                     }
                     finally
                     {
-                        s.releaseSendLock();
+                        s.getConsumer().releaseSendLock();
                     }
                 }
             }
@@ -1077,10 +1045,10 @@ public class AMQChannel implements AMQSe
         boolean requiresSuspend = _suspended.compareAndSet(false,true);
 
         // ensure all subscriptions have seen the change to the channel state
-        for(Subscription sub : _tag2SubscriptionMap.values())
+        for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
         {
-            sub.getSendLock();
-            sub.releaseSendLock();
+            sub.getConsumer().getSendLock();
+            sub.getConsumer().releaseSendLock();
         }
 
         try
@@ -1098,16 +1066,16 @@ public class AMQChannel implements AMQSe
 
         postRollbackTask.run();
 
-        for(QueueEntry entry : _resendList)
+        for(MessageInstance entry : _resendList)
         {
-            Subscription sub = entry.getDeliveredSubscription();
+            Consumer sub = entry.getDeliveredConsumer();
             if(sub == null || sub.isClosed())
             {
                 entry.release();
             }
             else
             {
-                sub.getQueue().resend(entry, sub);
+                entry.resend();
             }
         }
         _resendList.clear();
@@ -1115,9 +1083,9 @@ public class AMQChannel implements AMQSe
         if(requiresSuspend)
         {
             _suspended.set(false);
-            for(Subscription sub : _tag2SubscriptionMap.values())
+            for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
             {
-                sub.getQueue().deliverAsync(sub);
+                sub.getConsumer().externalStateChange();
             }
 
         }
@@ -1173,7 +1141,7 @@ public class AMQChannel implements AMQSe
     private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
             {
                 addUnacknowledgedMessage(entry, deliveryTag, sub);
             }
@@ -1233,140 +1201,97 @@ public class AMQChannel implements AMQSe
         return getId().compareTo(o.getId());
     }
 
-    private class MessageDeliveryAction implements ServerTransaction.Action
-    {
-        private final MessageReference<AMQMessage> _reference;
-        private List<? extends BaseQueue> _destinationQueues;
-
-        public MessageDeliveryAction(AMQMessage currentMessage,
-                                     List<? extends BaseQueue> destinationQueues)
-        {
-            _reference = currentMessage.newReference();
-            _destinationQueues = destinationQueues;
-        }
 
-        public void postCommit()
-        {
-            try
-            {
-                AMQMessage message = _reference.getMessage();
-                final boolean immediate = message.isImmediate();
-
-                for(int i = 0; i < _destinationQueues.size(); i++)
-                {
-                    BaseQueue queue = _destinationQueues.get(i);
-
-                    BaseQueue.PostEnqueueAction action;
-
-                    if(immediate)
-                    {
-                        action = new ImmediateAction();
-                    }
-                    else
-                    {
-                        action = null;
-                    }
-
-                    queue.enqueue(message, isTransactional(), action);
-
-                    if(queue instanceof AMQQueue)
-                    {
-                        ((AMQQueue)queue).checkCapacity(AMQChannel.this);
-                    }
-
-                }
-
-                message.getStoredMessage().flushToStore();
-                _reference.release();
-            }
-            catch (AMQException e)
-            {
-                // TODO
-                throw new RuntimeException(e);
-            }
-        }
-
-        public void onRollback()
-        {
-            // Maybe keep track of entries that were created and then delete them here in case of failure
-            // to in memory enqueue
-            _reference.release();
-        }
-
-
-    }
-    private class ImmediateAction implements BaseQueue.PostEnqueueAction
+    private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
     {
 
         public ImmediateAction()
         {
         }
 
-        public void onEnqueue(QueueEntry entry)
+        public void performAction(MessageInstance<?,C> entry)
         {
-            AMQQueue queue = entry.getQueue();
+            TransactionLogResource queue = entry.getOwningResource();
 
             if (!entry.getDeliveredToConsumer() && entry.acquire())
             {
 
                 ServerTransaction txn = new LocalTransaction(_messageStore);
-                Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
-                entries.add(entry);
                 final AMQMessage message = (AMQMessage) entry.getMessage();
-                txn.dequeue(queue, entry.getMessage(),
-                            new MessageAcknowledgeAction(entries)
-                            {
-                                @Override
-                                public void postCommit()
+                MessageReference ref = message.newReference();
+                try
+                {
+                    entry.delete();
+                    txn.dequeue(queue, message,
+                                new ServerTransaction.Action()
                                 {
-                                    try
+                                    @Override
+                                    public void postCommit()
                                     {
-                                        final
-                                        ProtocolOutputConverter outputConverter =
-                                                _session.getProtocolOutputConverter();
-
-                                        outputConverter.writeReturn(message.getMessagePublishInfo(),
-                                                                    message.getContentHeaderBody(),
-                                                                    message,
-                                                                    _channelId,
-                                                                    AMQConstant.NO_CONSUMERS.getCode(),
-                                                                    IMMEDIATE_DELIVERY_REPLY_TEXT);
+                                        try
+                                        {
+                                            final
+                                            ProtocolOutputConverter outputConverter =
+                                                    _session.getProtocolOutputConverter();
+
+                                            outputConverter.writeReturn(message.getMessagePublishInfo(),
+                                                                        message.getContentHeaderBody(),
+                                                                        message,
+                                                                        _channelId,
+                                                                        AMQConstant.NO_CONSUMERS.getCode(),
+                                                                        IMMEDIATE_DELIVERY_REPLY_TEXT);
+                                        }
+                                        catch (AMQException e)
+                                        {
+                                            throw new RuntimeException(e);
+                                        }
                                     }
-                                    catch (AMQException e)
+
+                                    @Override
+                                    public void onRollback()
                                     {
-                                        throw new RuntimeException(e);
+
                                     }
-                                    super.postCommit();
                                 }
-                            }
-                           );
-                txn.commit();
+                               );
+                    txn.commit();
+                }
+                finally
+                {
+                    ref.release();
+                }
 
 
             }
             else
             {
-                queue.checkCapacity(AMQChannel.this);
+                if(queue instanceof CapacityChecker)
+                {
+                    ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+                }
             }
 
         }
     }
 
-    private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+    private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>>
     {
         @Override
-        public void onEnqueue(final QueueEntry entry)
+        public void performAction(final MessageInstance<?,C> entry)
         {
-            AMQQueue queue = entry.getQueue();
-            queue.checkCapacity(AMQChannel.this);
+            TransactionLogResource queue = entry.getOwningResource();
+            if(queue instanceof CapacityChecker)
+            {
+                ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+            }
         }
     }
 
     private class MessageAcknowledgeAction implements ServerTransaction.Action
     {
-        private final Collection<QueueEntry> _ackedMessages;
+        private final Collection<MessageInstance> _ackedMessages;
 
-        public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+        public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
         {
             _ackedMessages = ackedMessages;
         }
@@ -1375,7 +1300,7 @@ public class AMQChannel implements AMQSe
         {
             try
             {
-                for(QueueEntry entry : _ackedMessages)
+                for(MessageInstance entry : _ackedMessages)
                 {
                     entry.delete();
                 }
@@ -1398,10 +1323,10 @@ public class AMQChannel implements AMQSe
             {
                 try
                 {
-                        for(QueueEntry entry : _ackedMessages)
-                        {
-                            entry.release();
-                        }
+                    for(MessageInstance entry : _ackedMessages)
+                    {
+                        entry.release();
+                    }
                 }
                 finally
                 {
@@ -1566,7 +1491,7 @@ public class AMQChannel implements AMQSe
     public void deadLetter(long deliveryTag) throws AMQException
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
-        final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
+        final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
 
         if (rejectedQueueEntry == null)
         {
@@ -1575,36 +1500,42 @@ public class AMQChannel implements AMQSe
         else
         {
             final ServerMessage msg = rejectedQueueEntry.getMessage();
+            final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
 
-            int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+            int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
                 {
                     @Override
-                    public void onEnqueue(final QueueEntry requeueEntry)
+                    public void performAction(final MessageInstance requeueEntry)
                     {
                         _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
-                                                                                   requeueEntry.getQueue().getName()));
+                                                                                   requeueEntry.getOwningResource().getName()));
                     }
                 }, null);
 
             if(requeues == 0)
             {
-                final AMQQueue queue = rejectedQueueEntry.getQueue();
 
-                final Exchange altExchange = queue.getAlternateExchange();
-
-                if (altExchange == null)
+                final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+                if(owningResource instanceof AMQQueue)
                 {
-                    _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
-                    _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+                    final AMQQueue queue = (AMQQueue) owningResource;
 
-                }
-                else
-                {
-                    _logger.debug(
-                            "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
-                            + deliveryTag);
-                    _actor.message(_logSubject,
-                                   ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                    final Exchange altExchange = queue.getAlternateExchange();
+
+                    if (altExchange == null)
+                    {
+                        _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+                        _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+
+                    }
+                    else
+                    {
+                        _logger.debug(
+                                "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+                                + deliveryTag);
+                        _actor.message(_logSubject,
+                                       ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+                    }
                 }
             }
 
@@ -1665,6 +1596,6 @@ public class AMQChannel implements AMQSe
     @Override
     public int getConsumerCount()
     {
-        return _tag2SubscriptionMap.size();
+        return _tag2SubscriptionTargetMap.size();
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Wed Feb 12 13:27:51 2014
@@ -94,8 +94,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
@@ -1669,7 +1668,7 @@ public class AMQProtocolEngine implement
         }
 
         @Override
-        public void deliverToClient(final Subscription sub, final ServerMessage message,
+        public void deliverToClient(final Consumer sub, final ServerMessage message,
                                     final InstanceProperties props, final long deliveryTag)
                 throws AMQException
         {
@@ -1678,7 +1677,7 @@ public class AMQProtocolEngine implement
                                                   props,
                                                   _channelId,
                                                   deliveryTag,
-                                                  ((SubscriptionImpl)sub).getConsumerTag());
+                                                  new AMQShortString(sub.getName()));
         }
 
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java Wed Feb 12 13:27:51 2014
@@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Wed Feb 12 13:27:51 2014
@@ -23,11 +23,8 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.Consumer;
 
 import java.util.Map;
 
@@ -35,34 +32,28 @@ public class ExtractResendAndRequeue imp
 {
     private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
 
-    private final Map<Long, QueueEntry> _msgToRequeue;
-    private final Map<Long, QueueEntry> _msgToResend;
-    private final boolean _requeueIfUnableToResend;
+    private final Map<Long, MessageInstance> _msgToRequeue;
+    private final Map<Long, MessageInstance> _msgToResend;
     private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
-    private final MessageStore _transactionLog;
 
     public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
-                                   Map<Long, QueueEntry> msgToRequeue,
-                                   Map<Long, QueueEntry> msgToResend,
-                                   boolean requeueIfUnableToResend,
-                                   MessageStore txnLog)
+                                   Map<Long, MessageInstance> msgToRequeue,
+                                   Map<Long, MessageInstance> msgToResend)
     {
         _unacknowledgedMessageMap = unacknowledgedMessageMap;
         _msgToRequeue = msgToRequeue;
         _msgToResend = msgToResend;
-        _requeueIfUnableToResend = requeueIfUnableToResend;
-        _transactionLog = txnLog;
     }
 
-    public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+    public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
     {
 
         message.setRedelivered();
-        final Subscription subscription = message.getDeliveredSubscription();
-        if (subscription != null)
+        final Consumer consumer = message.getDeliveredConsumer();
+        if (consumer != null)
         {
             // Consumer exists
-            if (!subscription.isClosed())
+            if (!consumer.isClosed())
             {
                 _msgToResend.put(deliveryTag, message);
             }
@@ -73,58 +64,13 @@ public class ExtractResendAndRequeue imp
         }
         else
         {
-            // Message has no consumer tag, so was "delivered" to a GET
-            // or consumer no longer registered
-            // cannot resend, so re-queue.
-            if (!message.isQueueDeleted())
-            {
-                if (_requeueIfUnableToResend)
-                {
-                    _msgToRequeue.put(deliveryTag, message);
-                }
-                else
-                {
-
-                    dequeueEntry(message);
-                    _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
-                }
-            }
-            else
-            {
-                dequeueEntry(message);
-                _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
-            }
+            _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
         }
 
         // false means continue processing
         return false;
     }
 
-
-    private void dequeueEntry(final QueueEntry node)
-    {
-        ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
-        dequeueEntry(node, txn);
-    }
-
-    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
-    {
-        txn.dequeue(node.getQueue(), node.getMessage(),
-                    new ServerTransaction.Action()
-                    {
-
-                        public void postCommit()
-                        {
-                            node.delete();
-                        }
-
-                        public void onRollback()
-                        {
-
-                        }
-                    });
-    }
-
     public void visitComplete()
     {
         _unacknowledgedMessageMap.clear();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Wed Feb 12 13:27:51 2014
@@ -20,15 +20,12 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
 
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
-    private Exchange _exchange;
+    private MessageDestination _messageDestination;
 
     /**
      * Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
         return _messagePublishInfo.getExchange();
     }
 
-    public Exchange getExchange()
+    public MessageDestination getDestination()
     {
-        return _exchange;
+        return _messageDestination;
     }
 
     public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
         return getContentHeader().getBodySize();
     }
 
-    public void setExchange(final Exchange e)
+    public void setMessageDestination(final MessageDestination e)
     {
-        _exchange = e;
+        _messageDestination = e;
     }
 
     public int getBodyCount() throws AMQException

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Wed Feb 12 13:27:51 2014
@@ -105,7 +105,7 @@ public class MessageMetaData implements 
     }
 
 
-    public int writeToBuffer(int offset, ByteBuffer dest)
+    public int writeToBuffer(ByteBuffer dest)
     {
         int oldPosition = dest.position();
         try

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Wed Feb 12 13:27:51 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Collection;
@@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMa
          *@param message the message being iterated over @return true to stop iteration, false to continue
          * @throws AMQException
          */
-        boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
+        boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
 
         void visitComplete();
     }
 
     void visit(Visitor visitor) throws AMQException;
 
-    void add(long deliveryTag, QueueEntry message);
+    void add(long deliveryTag, MessageInstance message);
 
-    QueueEntry remove(long deliveryTag);
+    MessageInstance remove(long deliveryTag);
 
-    Collection<QueueEntry> cancelAllMessages();
+    Collection<MessageInstance> cancelAllMessages();
 
     int size();
 
     void clear();
 
-    QueueEntry get(long deliveryTag);
+    MessageInstance get(long deliveryTag);
 
     /**
      * Get the set of delivery tags that are outstanding.
@@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMa
      */
     Set<Long> getDeliveryTags();
 
-    Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+    Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
 
 }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Wed Feb 12 13:27:51 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Collection;
@@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImp
 
     private long _unackedSize;
 
-    private Map<Long, QueueEntry> _map;
+    private Map<Long, MessageInstance> _map;
 
     private long _lastDeliveryTag;
 
@@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImp
     public UnacknowledgedMessageMapImpl(int prefetchLimit)
     {
         _prefetchLimit = prefetchLimit;
-        _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
+        _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
     }
 
-    public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
+    public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
     {
         if (multiple)
         {
@@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImp
         }
         else
         {
-            final QueueEntry entry = get(deliveryTag);
+            final MessageInstance entry = get(deliveryTag);
             if(entry != null)
             {
                 msgs.put(deliveryTag, entry);
@@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImp
 
     }
 
-    public void remove(Map<Long,QueueEntry> msgs)
+    public void remove(Map<Long,MessageInstance> msgs)
     {
         synchronized (_lock)
         {
@@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public QueueEntry remove(long deliveryTag)
+    public MessageInstance remove(long deliveryTag)
     {
         synchronized (_lock)
         {
 
-            QueueEntry message = _map.remove(deliveryTag);
+            MessageInstance message = _map.remove(deliveryTag);
             if(message != null)
             {
                 _unackedSize -= message.getMessage().getSize();
@@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImp
     {
         synchronized (_lock)
         {
-            Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
-            for (Map.Entry<Long, QueueEntry> entry : currentEntries)
+            Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
+            for (Map.Entry<Long, MessageInstance> entry : currentEntries)
             {
                 visitor.callback(entry.getKey().longValue(), entry.getValue());
             }
@@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public void add(long deliveryTag, QueueEntry message)
+    public void add(long deliveryTag, MessageInstance message)
     {
         synchronized (_lock)
         {
@@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public Collection<QueueEntry> cancelAllMessages()
+    public Collection<MessageInstance> cancelAllMessages()
     {
         synchronized (_lock)
         {
-            Collection<QueueEntry> currentEntries = _map.values();
-            _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+            Collection<MessageInstance> currentEntries = _map.values();
+            _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
             _unackedSize = 0l;
             return currentEntries;
         }
@@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public QueueEntry get(long key)
+    public MessageInstance get(long key)
     {
         synchronized (_lock)
         {
@@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+    public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
     {
-        Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+        Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
         collect(deliveryTag, multiple, ackedMessageMap);
         remove(ackedMessageMap);
         return ackedMessageMap.values();
     }
 
-    private void collect(long key, Map<Long, QueueEntry> msgs)
+    private void collect(long key, Map<Long, MessageInstance> msgs)
     {
         synchronized (_lock)
         {
-            for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
+            for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
             {
                 msgs.put(entry.getKey(),entry.getValue());
                 if (entry.getKey() == key)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler i
                               " args:" + body.getArguments());
             }
 
-            AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+            MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
 
             if (queue == null)
             {
@@ -120,8 +121,12 @@ public class BasicConsumeMethodHandler i
                     if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
                     {
 
-                        AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
-                                                                              body.getArguments(), body.getNoLocal(), body.getExclusive());
+                        AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+                                                                               queue,
+                                                                               !body.getNoAck(),
+                                                                               body.getArguments(),
+                                                                               body.getExclusive(),
+                                                                               body.getNoLocal());
                         if (!body.getNowait())
                         {
                             MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
@@ -156,14 +161,14 @@ public class BasicConsumeMethodHandler i
 
 
                 }
-                catch (AMQQueue.ExistingExclusiveSubscription e)
+                catch (AMQQueue.ExistingExclusiveConsumer e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "
                                                    + queue.getName()
                                                    + " as it already has an existing exclusive consumer");
                 }
-                catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+                catch (AMQQueue.ExistingConsumerPreventsExclusive e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -24,27 +24,31 @@ package org.apache.qpid.server.protocol.
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicGetBody;
 import org.apache.qpid.framing.BasicGetEmptyBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.EnumSet;
+
 public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
 {
     private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -124,50 +128,79 @@ public class BasicGetMethodHandler imple
 
         final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
 
-        final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
-        {
-
-            @Override
-            public void deliverToClient(final Subscription sub, final ServerMessage message, final
-                                        InstanceProperties props, final long deliveryTag)
-            throws AMQException
-            {
-                singleMessageCredit.useCreditForMessage(message.getSize());
-                session.getProtocolOutputConverter().writeGetOk(message,
-                                                                props,
-                                                                channel.getChannelId(),
-                                                                deliveryTag,
-                                                                queue.getMessageCount());
-
-
-            }
-        };
+        final GetDeliveryMethod getDeliveryMethod =
+                new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
         final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
         {
 
-            public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+            public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
             {
                 channel.addUnacknowledgedMessage(entry, deliveryTag, null);
             }
         };
 
-        Subscription sub;
+        ConsumerTarget_0_8 target;
+        EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+                                                          Consumer.Option.SEES_REQUEUES);
         if(acks)
         {
-            sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+            target = ConsumerTarget_0_8.createAckTarget(channel,
+                                                        AMQShortString.EMPTY_STRING, null,
+                                                        singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
         else
         {
-            sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+            target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
+                                                          AMQShortString.EMPTY_STRING, null,
+                                                          singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
 
-        queue.registerSubscription(sub,false);
-        queue.flushSubscription(sub);
-        queue.unregisterSubscription(sub);
-        return(!singleMessageCredit.hasCredit());
+        Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+        sub.flush();
+        sub.close();
+        return(getDeliveryMethod.hasDeliveredMessage());
 
 
     }
 
 
+    private static class GetDeliveryMethod implements ClientDeliveryMethod
+    {
+
+        private final FlowCreditManager _singleMessageCredit;
+        private final AMQProtocolSession _session;
+        private final AMQChannel _channel;
+        private final AMQQueue _queue;
+        private boolean _deliveredMessage;
+
+        public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+                                 final AMQProtocolSession session,
+                                 final AMQChannel channel, final AMQQueue queue)
+        {
+            _singleMessageCredit = singleMessageCredit;
+            _session = session;
+            _channel = channel;
+            _queue = queue;
+        }
+
+        @Override
+        public void deliverToClient(final Consumer sub, final ServerMessage message,
+                                    final InstanceProperties props, final long deliveryTag) throws AMQException
+        {
+            _singleMessageCredit.useCreditForMessage(message.getSize());
+            _session.getProtocolOutputConverter().writeGetOk(message,
+                                                            props,
+                                                            _channel.getChannelId(),
+                                                            deliveryTag,
+                                                            _queue.getMessageCount());
+
+            _deliveredMessage = true;
+        }
+
+        public boolean hasDeliveredMessage()
+        {
+            return _deliveredMessage;
+        }
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler i
         }
 
         VirtualHost vHost = session.getVirtualHost();
-        Exchange exch = vHost.getExchange(exchangeName.toString());
+        MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
         // if the exchange does not exist we raise a channel exception
         if (exch == null)
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -56,7 +56,7 @@ public class BasicRecoverMethodHandler i
             throw body.getChannelNotFoundException(channelId);
         }
 
-        channel.resend(body.getRequeue());
+        channel.resend();
 
         // Qpid 0-8 hacks a synchronous -ok onto recover.
         // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -58,7 +58,7 @@ public class BasicRecoverSyncMethodHandl
             throw body.getChannelNotFoundException(channelId);
         }
         channel.sync();
-        channel.resend(body.getRequeue());
+        channel.resend();
 
         // Qpid 0-8 hacks a synchronous -ok onto recover.
         // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,7 @@ public class BasicRejectMethodHandler im
 
         long deliveryTag = body.getDeliveryTag();
 
-        QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+        MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
 
         if (message == null)
         {
@@ -73,16 +74,6 @@ public class BasicRejectMethodHandler im
         }
         else
         {
-            if (message.isQueueDeleted())
-            {
-                _logger.warn("Message's Queue has already been purged, dropping message");
-                message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
-                if(message != null)
-                {
-                    message.delete();
-                }
-                return;
-            }
 
             if (message.getMessage() == null)
             {
@@ -98,41 +89,43 @@ public class BasicRejectMethodHandler im
                               " on channel:" + channel.debugIdentity());
             }
 
-            message.reject();
-
             if (body.getRequeue())
             {
-                channel.requeue(deliveryTag);
-
                 //this requeue represents a message rejected from the pre-dispatch queue
                 //therefore we need to amend the delivery counter.
                 message.decrementDeliveryCount();
+
+                channel.requeue(deliveryTag);
             }
             else
             {
-                 final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
-                 _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
-                 if (maxDeliveryCountEnabled)
-                 {
-                     final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
-                     _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
-                     if (deliveredTooManyTimes)
-                     {
-                         channel.deadLetter(body.getDeliveryTag());
-                     }
-                     else
-                     {
-                         //this requeue represents a message rejected because of a recover/rollback that we
-                         //are not ready to DLQ. We rely on the reject command to resend from the unacked map
-                         //and therefore need to increment the delivery counter so we cancel out the effect
-                         //of the AMQChannel#resend() decrement.
-                         message.incrementDeliveryCount();
-                     }
-                 }
-                 else
-                 {
-                     channel.deadLetter(body.getDeliveryTag());
-                 }
+                // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+                // as it would prevent redelivery
+                // message.reject();
+
+                final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+                _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+                if (maxDeliveryCountEnabled)
+                {
+                    final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+                    _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+                    if (deliveredTooManyTimes)
+                    {
+                        channel.deadLetter(body.getDeliveryTag());
+                    }
+                    else
+                    {
+                        //this requeue represents a message rejected because of a recover/rollback that we
+                        //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+                        //and therefore need to increment the delivery counter so we cancel out the effect
+                        //of the AMQChannel#resend() decrement.
+                        message.incrementDeliveryCount();
+                    }
+                }
+                else
+                {
+                    channel.requeue(deliveryTag);
+                }
             }
         }
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Wed Feb 12 13:27:51 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
@@ -134,8 +135,8 @@ public class QueueDeclareHandler impleme
                             }
                         };
                         protocolConnection.addSessionCloseTask(sessionCloseTask);
-                        queue.addQueueDeleteTask(new AMQQueue.Task() {
-                            public void doTask(AMQQueue queue) throws AMQException
+                        queue.addQueueDeleteTask(new Action<AMQQueue>() {
+                            public void performAction(AMQQueue queue)
                             {
                                 protocolConnection.removeSessionCloseTask(sessionCloseTask);
                             }
@@ -245,9 +246,9 @@ public class QueueDeclareHandler impleme
 
             session.addSessionCloseTask(deleteQueueTask);
 
-            queue.addQueueDeleteTask(new AMQQueue.Task()
+            queue.addQueueDeleteTask(new Action<AMQQueue>()
             {
-                public void doTask(AMQQueue queue)
+                public void performAction(AMQQueue queue)
                 {
                     session.removeSessionCloseTask(deleteQueueTask);
                 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java Wed Feb 12 13:27:51 2014
@@ -74,7 +74,7 @@ public class TxRollbackHandler implement
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             // Why, are we not allowed to send messages back to client before the ok method?
-            channel.resend(false);
+            channel.resend();
 
         }
         catch (AMQException e)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org