You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/05 16:51:51 UTC
qpid-broker-j git commit: QPID-8027: [Broker-J][AMQP 0-8..0-9-1]
Restore previous broker behaviour to ignore invalid delivery tag on message
acknowledgement
Repository: qpid-broker-j
Updated Branches:
refs/heads/master c2412ba4a -> 01bcb51d4
QPID-8027: [Broker-J][AMQP 0-8..0-9-1] Restore previous broker behaviour to ignore invalid delivery tag on message acknowledgement
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/01bcb51d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/01bcb51d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/01bcb51d
Branch: refs/heads/master
Commit: 01bcb51d49d1893f98f2b870ca3962e70ad2d446
Parents: c2412ba
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Dec 5 16:51:05 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Dec 5 16:51:05 2017 +0000
----------------------------------------------------------------------
.../qpid/server/protocol/v0_8/AMQChannel.java | 25 ++++++++++++--
.../v0_8/UnacknowledgedMessageMapImpl.java | 16 ++++-----
.../qpid/tests/protocol/v0_8/BasicTest.java | 34 ++++++++++++++++++++
3 files changed, 65 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/01bcb51d/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index df9f946..d2f071e 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -1736,8 +1736,29 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
Collection<MessageConsumerAssociation> ackedMessages = _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
- final Collection<MessageInstance> messages = Collections2.transform(ackedMessages, MESSAGE_INSTANCE_FUNCTION);
- _transaction.dequeue(messages, new MessageAcknowledgeAction(ackedMessages));
+
+ if (!ackedMessages.isEmpty())
+ {
+ final Collection<MessageInstance> messages =
+ Collections2.transform(ackedMessages, MESSAGE_INSTANCE_FUNCTION);
+ _transaction.dequeue(messages, new MessageAcknowledgeAction(ackedMessages));
+ }
+
+ /*
+ The AMQP 0-9-1 spec requires to raise a channel exception "precondition-failed"
+ when delivery tag is not valid:
+ {quote}
+ The server MUST validate that a non-zero delivery-tag refers to a delivered message, and raise a channel
+ exception if this is not the case. On a transacted channel, this check MUST be done immediately and not
+ delayed until a Tx.Commit. Specifically, a client MUST not acknowledge the same message more than once.
+ {quote}
+
+ The current broker behaviour is spec incompliant but it is kept for backward compatibility.
+ It should close the channel as below:
+
+ if (ackedMessages.isEmpty())
+ closeChannel(ErrorCodes.NOT_ALLOWED, "precondition-failed: Delivery tag '%d' is not valid.");
+ */
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/01bcb51d/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
index 88ac3a5..a8c5709 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -180,16 +181,15 @@ class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
else
{
final MessageConsumerAssociation association = remove(deliveryTag, true);
- final MessageInstance messageInstance = association.getMessageInstance();
- if(association != null && messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
+ if (association != null)
{
- return Collections.singleton(association);
- }
- else
- {
- return Collections.emptySet();
+ final MessageInstance messageInstance = association.getMessageInstance();
+ if (messageInstance != null && messageInstance.makeAcquisitionUnstealable(association.getConsumer()))
+ {
+ return Collections.singleton(association);
+ }
}
-
+ return Collections.emptySet();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/01bcb51d/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index 7349b8d..1eb0edd 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -158,4 +158,38 @@ public class BasicTest extends BrokerAdminUsingTestBase
assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(0)));
}
}
+
+ @Test
+ @SpecificationTest(section = "1.8.3.13",
+ description = "The server MUST validate that a non-zero delivery-tag refers to a delivered message,"
+ + " and raise a channel exception if this is not the case. On a transacted channel,"
+ + " this check MUST be done immediately and not delayed until a Tx.Commit. Specifically,"
+ + " a client MUST not acknowledge the same message more than once."
+ + ""
+ + "Note current broker behaviour is spec incompliant: broker ignores not valid delivery tags")
+ public void ackWithInvalidDeliveryTag() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ String consumerTag = "A";
+ final long deliveryTag = 12345L;
+ String queueName = BrokerAdmin.TEST_QUEUE_NAME;
+ interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().qosPrefetchCount(1)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(queueName)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class)
+ .basic().ackDeliveryTag(deliveryTag).ack()
+ .channel().close().consumeResponse(ChannelCloseOkBody.class);
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org