You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/05 16:51:51 UTC

qpid-broker-j git commit: QPID-8027: [Broker-J][AMQP 0-8..0-9-1] Restore previous broker behaviour to ignore invalid delivery tag on message acknowledgement

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master c2412ba4a -> 01bcb51d4


QPID-8027: [Broker-J][AMQP 0-8..0-9-1] Restore previous broker behaviour to ignore invalid delivery tag on message acknowledgement


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/01bcb51d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/01bcb51d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/01bcb51d

Branch: refs/heads/master
Commit: 01bcb51d49d1893f98f2b870ca3962e70ad2d446
Parents: c2412ba
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Dec 5 16:51:05 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Dec 5 16:51:05 2017 +0000

----------------------------------------------------------------------
 .../qpid/server/protocol/v0_8/AMQChannel.java   | 25 ++++++++++++--
 .../v0_8/UnacknowledgedMessageMapImpl.java      | 16 ++++-----
 .../qpid/tests/protocol/v0_8/BasicTest.java     | 34 ++++++++++++++++++++
 3 files changed, 65 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/01bcb51d/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index df9f946..d2f071e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1736,8 +1736,29 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         }
 
         Collection<MessageConsumerAssociation> ackedMessages = _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
-        final Collection<MessageInstance> messages = Collections2.transform(ackedMessages, MESSAGE_INSTANCE_FUNCTION);
-        _transaction.dequeue(messages, new MessageAcknowledgeAction(ackedMessages));
+
+        if (!ackedMessages.isEmpty())
+        {
+            final Collection<MessageInstance> messages =
+                    Collections2.transform(ackedMessages, MESSAGE_INSTANCE_FUNCTION);
+            _transaction.dequeue(messages, new MessageAcknowledgeAction(ackedMessages));
+        }
+
+        /*
+        The AMQP 0-9-1 spec requires to raise a channel exception "precondition-failed"
+        when delivery tag is not valid:
+        {quote}
+          The server MUST validate that a non-zero delivery-tag refers to a delivered message, and raise a channel
+          exception if this is not the case. On a transacted channel, this check MUST be done immediately and not
+          delayed until a Tx.Commit. Specifically, a client MUST not acknowledge the same message more than once.
+        {quote}
+
+        The current broker behaviour is spec incompliant but it is kept for backward compatibility.
+        It should close the channel as below:
+
+        if (ackedMessages.isEmpty())
+            closeChannel(ErrorCodes.NOT_ALLOWED, "precondition-failed: Delivery tag '%d' is not valid.");
+        */
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/01bcb51d/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index 88ac3a5..a8c5709 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -180,16 +181,15 @@ class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
         else
         {
             final MessageConsumerAssociation association = remove(deliveryTag, true);
-            final MessageInstance messageInstance = association.getMessageInstance();
-            if(association != null && messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
+            if (association != null)
             {
-                return Collections.singleton(association);
-            }
-            else
-            {
-                return Collections.emptySet();
+                final MessageInstance messageInstance = association.getMessageInstance();
+                if (messageInstance != null && messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
+                {
+                    return Collections.singleton(association);
+                }
             }
-
+            return Collections.emptySet();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/01bcb51d/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index 7349b8d..1eb0edd 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -158,4 +158,38 @@ public class BasicTest extends BrokerAdminUsingTestBase
             assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(0)));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "1.8.3.13",
+            description = "The server MUST validate that a non-zero delivery-tag refers to a delivered message,"
+                          + " and raise a channel exception if this is not the case. On a transacted channel,"
+                          + " this check MUST be done immediately and not delayed until a Tx.Commit. Specifically,"
+                          + " a client MUST not acknowledge the same message more than once."
+                          + ""
+                          + "Note current broker behaviour is spec incompliant: broker ignores not valid delivery tags")
+    public void ackWithInvalidDeliveryTag() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String consumerTag = "A";
+            final long deliveryTag = 12345L;
+            String queueName = BrokerAdmin.TEST_QUEUE_NAME;
+            interaction.openAnonymousConnection()
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .basic().qosPrefetchCount(1)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(queueName)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class)
+                       .channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic().ackDeliveryTag(deliveryTag).ack()
+                       .channel().close().consumeResponse(ChannelCloseOkBody.class);
+        }
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org