You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/26 18:34:40 UTC

svn commit: r1771508 - in /qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src: main/java/org/apache/qpid/server/protocol/v0_8/ test/java/org/apache/qpid/server/protocol/v0_8/

Author: rgodfrey
Date: Sat Nov 26 18:34:40 2016
New Revision: 1771508

URL: http://svn.apache.org/viewvc?rev=1771508&view=rev
Log:
QPID-7425 : Properly handle messages delivered due to basic-get, improve handling of ensuring that released messages were still held by the same consumer

Modified:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Nov 26 18:34:40 2016
@@ -299,9 +299,9 @@ public class AMQChannel
         if (acks)
         {
 
-            target = ConsumerTarget_0_8.createAckTarget(this,
-                                                        AMQShortString.EMPTY_STRING, null,
-                                                        INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
+            target = ConsumerTarget_0_8.createGetAckTarget(this,
+                                                           AMQShortString.EMPTY_STRING, null,
+                                                           INFINITE_CREDIT_CREDIT_MANAGER, getDeliveryMethod);
         }
         else
         {
@@ -935,13 +935,16 @@ public class AMQChannel
 
     /**
      * Add a message to the channel-based list of unacknowledged messages
-     *
-     * @param entry       the record of the message on the queue that was delivered
+     *  @param entry       the record of the message on the queue that was delivered
      * @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
      *                    delivery tag)
      * @param consumer The consumer that is to acknowledge this message.
+     * @param usesCredit
      */
-    public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer)
+    public void addUnacknowledgedMessage(MessageInstance entry,
+                                         long deliveryTag,
+                                         ConsumerImpl consumer,
+                                         final boolean usesCredit)
     {
         if (_logger.isDebugEnabled())
         {
@@ -949,7 +952,7 @@ public class AMQChannel
                                + ") for " + consumer + " on " + entry.getOwningResource().getName());
         }
 
-        _unacknowledgedMessageMap.add(deliveryTag, entry, (ConsumerTarget_0_8) consumer.getTarget());
+        _unacknowledgedMessageMap.add(deliveryTag, entry, consumer, usesCredit);
 
     }
 
@@ -1015,15 +1018,17 @@ public class AMQChannel
      */
     public void requeue(long deliveryTag)
     {
-        MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag, true);
 
-        if (unacked != null)
+        final UnacknowledgedMessageMap.Entry entry = _unacknowledgedMessageMap.remove(deliveryTag, true);
+
+        if (entry != null)
         {
+            MessageInstance unacked = entry.getMessageInstance();
             // Mark message redelivered
             unacked.setRedelivered();
 
             // Ensure message is released for redelivery
-            unacked.release(unacked.getAcquiringConsumer());
+            unacked.release(entry.getConsumer());
         }
         else
         {
@@ -1765,19 +1770,21 @@ public class AMQChannel
     private void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
-        final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag, true);
+        final UnacknowledgedMessageMap.Entry entry = unackedMap.remove(deliveryTag, true);
 
-        if (rejectedQueueEntry == null)
+        if (entry == null)
         {
             _logger.warn("No message found, unable to DLQ delivery tag: " + deliveryTag);
         }
         else
         {
-            final ServerMessage msg = rejectedQueueEntry.getMessage();
+
+            final MessageInstance messageInstance = entry.getMessageInstance();
+            final ServerMessage msg = messageInstance.getMessage();
             int requeues = 0;
-            if (rejectedQueueEntry.makeAcquisitionUnstealable(rejectedQueueEntry.getAcquiringConsumer()))
+            if (messageInstance.makeAcquisitionUnstealable(entry.getConsumer()))
             {
-                requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
+                requeues = messageInstance.routeToAlternate(new Action<MessageInstance>()
                 {
                     @Override
                     public void performAction(final MessageInstance requeueEntry)
@@ -1792,7 +1799,7 @@ public class AMQChannel
             if(requeues == 0)
             {
 
-                final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+                final TransactionLogResource owningResource = messageInstance.getOwningResource();
                 if(owningResource instanceof Queue)
                 {
                     final Queue<?> queue = (Queue<?>) owningResource;
@@ -2566,19 +2573,19 @@ public class AMQChannel
             _logger.debug("RECV[" + _channelId + "] BasicNack[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " requeue: " + requeue + " ]");
         }
 
-        Map<Long, MessageInstance> nackedMessageMap = new LinkedHashMap<>();
+        Map<Long, UnacknowledgedMessageMap.Entry> nackedMessageMap = new LinkedHashMap<>();
         _unacknowledgedMessageMap.collect(deliveryTag, multiple, nackedMessageMap);
 
-        for(MessageInstance message : nackedMessageMap.values())
+        for(UnacknowledgedMessageMap.Entry unackedEntry : nackedMessageMap.values())
         {
 
-            if (message == null)
+            if (unackedEntry == null)
             {
                 _logger.warn("Ignoring nack request as message is null for tag:" + deliveryTag);
             }
             else
             {
-
+                MessageInstance message = unackedEntry.getMessageInstance();
                 if (message.getMessage() == null)
                 {
                     _logger.warn("Message has already been purged, unable to nack.");

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sat Nov 26 18:34:40 2016
@@ -209,27 +209,31 @@ public abstract class ConsumerTarget_0_8
                                consumerTag,
                                filters, creditManager,
                                channel.getClientDeliveryMethod(),
-                               multiQueue);
+                               multiQueue, true);
     }
 
 
-    public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
-                                                     AMQShortString consumerTag, FieldTable filters,
-                                                     FlowCreditManager_0_8 creditManager,
-                                                     ClientDeliveryMethod deliveryMethod)
+    public static ConsumerTarget_0_8 createGetAckTarget(AMQChannel channel,
+                                                        AMQShortString consumerTag, FieldTable filters,
+                                                        FlowCreditManager_0_8 creditManager,
+                                                        ClientDeliveryMethod deliveryMethod)
     {
-        return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, false);
+        return new AckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, false, false);
     }
 
     static final class AckConsumer extends ConsumerTarget_0_8
     {
+        private final boolean _usesCredit;
+
         public AckConsumer(AMQChannel channel,
                            AMQShortString consumerTag, FieldTable filters,
                            FlowCreditManager_0_8 creditManager,
                            ClientDeliveryMethod deliveryMethod,
-                           boolean multiQueue)
+                           boolean multiQueue,
+                           final boolean usesCredit)
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, multiQueue);
+            _usesCredit = usesCredit;
         }
 
         /**
@@ -252,7 +256,7 @@ public abstract class ConsumerTarget_0_8
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
                 addUnacknowledgedMessage(entry);
-                getChannel().addUnacknowledgedMessage(entry, deliveryTag, consumer);
+                getChannel().addUnacknowledgedMessage(entry, deliveryTag, consumer, _usesCredit);
                 long size = sendToClient(consumer, entry.getMessage(), entry.getInstanceProperties(), deliveryTag);
                 entry.incrementDeliveryCount();
             }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Sat Nov 26 18:34:40 2016
@@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 
 
@@ -42,16 +42,25 @@ public interface UnacknowledgedMessageMa
 
     void visit(Visitor visitor);
 
-    void add(long deliveryTag, MessageInstance message, final ConsumerTarget target);
+    void add(long deliveryTag, MessageInstance message, final ConsumerImpl target, final boolean usesCredit);
 
-    MessageInstance remove(long deliveryTag, final boolean restoreCredit);
+    Entry remove(long deliveryTag, final boolean restoreCredit);
 
     int size();
 
     MessageInstance get(long deliveryTag);
 
     Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
-    void collect(long key, boolean multiple, Map<Long, MessageInstance> msgs);
+    void collect(long key, boolean multiple, Map<Long, Entry> msgs);
+
+    interface Entry
+    {
+        MessageInstance getMessageInstance();
+
+        ConsumerImpl getConsumer();
+
+        long getSize();
+    }
 }
 
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Sat Nov 26 18:34:40 2016
@@ -27,39 +27,49 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
 {
-    private static final class MessageTargetPair
+    private static final class EntryImpl implements Entry
     {
         private final MessageInstance _messageInstance;
-        private final ConsumerTarget _target;
+        private final ConsumerImpl _consumer;
+        private final boolean _usesCredit;
 
-        private MessageTargetPair(final MessageInstance messageInstance, final ConsumerTarget target)
+        private EntryImpl(final MessageInstance messageInstance, final ConsumerImpl consumer, final boolean usesCredit)
         {
             _messageInstance = messageInstance;
-            _target = target;
+            _consumer = consumer;
+            _usesCredit = usesCredit;
         }
 
+        @Override
         public MessageInstance getMessageInstance()
         {
             return _messageInstance;
         }
 
-        public ConsumerTarget getTarget()
+        @Override
+        public ConsumerImpl getConsumer()
         {
-            return _target;
+            return _consumer;
         }
 
+        @Override
         public long getSize()
         {
             return _messageInstance.getMessage().getSize();
         }
+
+        public boolean isUsesCredit()
+        {
+            return _usesCredit;
+        }
     }
-    private final Map<Long, MessageTargetPair> _map;
+    private final Map<Long, EntryImpl> _map;
     // we keep this separately as it is accessed by the management thread
     private volatile int _size;
 
@@ -71,7 +81,7 @@ class UnacknowledgedMessageMapImpl imple
         _creditRestorer = creditRestorer;
     }
 
-    public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
+    public void collect(long deliveryTag, boolean multiple, Map<Long, Entry> msgs)
     {
         if (multiple)
         {
@@ -79,7 +89,7 @@ class UnacknowledgedMessageMapImpl imple
         }
         else
         {
-            final MessageInstance entry = get(deliveryTag);
+            final Entry entry = _map.get(deliveryTag);
             if(entry != null)
             {
                 msgs.put(deliveryTag, entry);
@@ -88,40 +98,40 @@ class UnacknowledgedMessageMapImpl imple
 
     }
 
-    public void remove(Map<Long,MessageInstance> msgs)
+    private void remove(Collection<Long> msgs)
     {
-        for (Long deliveryTag : msgs.keySet())
+        for (Long deliveryTag : msgs)
         {
             remove(deliveryTag, true);
         }
     }
 
-    public MessageInstance remove(long deliveryTag, final boolean restoreCredit)
+    public Entry remove(long deliveryTag, final boolean restoreCredit)
     {
-        MessageTargetPair message = _map.remove(deliveryTag);
-        if(message != null)
+        EntryImpl entry = _map.remove(deliveryTag);
+        if(entry != null)
         {
             _size--;
-            if(restoreCredit)
+            if(restoreCredit && entry.isUsesCredit())
             {
-                _creditRestorer.restoreCredit(message.getTarget(), 1, message.getSize());
+                _creditRestorer.restoreCredit(entry.getConsumer().getTarget(), 1, entry.getSize());
             }
         }
-        return message.getMessageInstance();
+        return entry;
     }
 
     public void visit(Visitor visitor)
     {
-        for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
+        for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
         {
             visitor.callback(entry.getKey(), entry.getValue().getMessageInstance());
         }
         visitor.visitComplete();
     }
 
-    public void add(long deliveryTag, MessageInstance message, final ConsumerTarget target)
+    public void add(long deliveryTag, MessageInstance message, final ConsumerImpl consumer, final boolean usesCredit)
     {
-        if(_map.put(deliveryTag, new MessageTargetPair(message,target)) == null)
+        if(_map.put(deliveryTag, new EntryImpl(message, consumer, usesCredit)) == null)
         {
             _size++;
         }
@@ -138,7 +148,7 @@ class UnacknowledgedMessageMapImpl imple
 
     public MessageInstance get(long key)
     {
-        MessageTargetPair messageTargetPair = _map.get(key);
+        Entry messageTargetPair = _map.get(key);
         return messageTargetPair == null ? null : messageTargetPair.getMessageInstance();
     }
 
@@ -146,13 +156,14 @@ class UnacknowledgedMessageMapImpl imple
     {
         if(multiple)
         {
-            Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<>();
+            Map<Long, Entry> ackedMessageMap = new LinkedHashMap<>();
             collect(deliveryTag, multiple, ackedMessageMap);
-            remove(ackedMessageMap);
+            remove(ackedMessageMap.keySet());
             List<MessageInstance> acknowledged = new ArrayList<>();
-            for (MessageInstance instance : ackedMessageMap.values())
+            for (Entry entry : ackedMessageMap.values())
             {
-                if (instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
+                MessageInstance instance = entry.getMessageInstance();
+                if (instance.makeAcquisitionUnstealable(entry.getConsumer()))
                 {
                     acknowledged.add(instance);
                 }
@@ -162,8 +173,8 @@ class UnacknowledgedMessageMapImpl imple
         else
         {
             MessageInstance instance;
-            instance = remove(deliveryTag, true);
-            if(instance != null && instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
+            final Entry entry = remove(deliveryTag, true);
+            if(entry != null && (instance = entry.getMessageInstance()).makeAcquisitionUnstealable(entry.getConsumer()))
             {
                 return Collections.singleton(instance);
             }
@@ -175,11 +186,11 @@ class UnacknowledgedMessageMapImpl imple
         }
     }
 
-    private void collect(long key, Map<Long, MessageInstance> msgs)
+    private void collect(long key, Map<Long, Entry> msgs)
     {
-        for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
+        for (Map.Entry<Long, EntryImpl> entry : _map.entrySet())
         {
-            msgs.put(entry.getKey(), entry.getValue().getMessageInstance());
+            msgs.put(entry.getKey(), entry.getValue());
             if (entry.getKey() == key)
             {
                 break;

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java Sat Nov 26 18:34:40 2016
@@ -33,7 +33,6 @@ import org.mockito.stubbing.Answer;
 
 import org.apache.qpid.QpidException;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
@@ -77,7 +76,7 @@ public class ExtractResendAndRequeueTest
         when(_queue.isDeleted()).thenReturn(_queueDeleted);
         _consumer = mock(ConsumerImpl.class);
         when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
-        ConsumerTarget target = mock(ConsumerTarget.class);
+        ConsumerImpl consumer = mock(ConsumerImpl.class);
 
         long id = 0;
 
@@ -100,7 +99,7 @@ public class ExtractResendAndRequeueTest
                 }
             }).when(entry).delete();
 
-            _unacknowledgedMessageMap.add(id, entry, target);
+            _unacknowledgedMessageMap.add(id, entry, consumer, true);
             _referenceList.add(entry);
             //Increment ID;
             id++;

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1771508&r1=1771507&r2=1771508&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java Sat Nov 26 18:34:40 2016
@@ -73,7 +73,7 @@ public class UnacknowledgedMessageMapTes
         for(int i = 0; i < size; i++)
         {
             msgs[i] = createMessageInstance(i);
-            map.add((long)i, msgs[i], null);
+            map.add((long)i, msgs[i], _consumer, true);
         }
         return msgs;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org