You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/29 13:19:21 UTC

qpid-broker-j git commit: QPID-7842: Disallow sending of a transfer with not unique delivery tag amongst all deliveries that could be considered unsettled by either end of the link

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 498ba2cfb -> 0598f7ebc


QPID-7842: Disallow sending of a transfer with not unique delivery tag amongst all deliveries that could be considered unsettled by either end of the link


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0598f7eb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0598f7eb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0598f7eb

Branch: refs/heads/master
Commit: 0598f7ebc982a2c534be960a8c7c06ab4bfa1002
Parents: 498ba2c
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Jun 29 13:37:31 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Jun 29 13:37:31 2017 +0100

----------------------------------------------------------------------
 .../v1_0/AbstractReceivingLinkEndpoint.java     |  8 +++
 .../v1_0/type/ErrorCarryingFrameBody.java       | 28 +++++++++
 .../protocol/v1_0/type/transport/Close.java     |  4 +-
 .../protocol/v1_0/type/transport/Detach.java    |  4 +-
 .../protocol/v1_0/type/transport/End.java       |  4 +-
 .../protocol/v1_0/messaging/TransferTest.java   | 66 +++++++++++++++++++-
 6 files changed, 107 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index d159de5..e87fffb 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -176,6 +176,14 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             error = new Error(AmqpError.INVALID_FIELD,
                                     "Transfer \"delivery-tag\" is required for a new delivery.");
         }
+        else if (_unsettled.containsKey(transfer.getDeliveryTag()))
+        {
+            error = new Error(AmqpError.ILLEGAL_STATE,
+                              String.format("Delivery-tag '%s' is used by another unsettled delivery."
+                                            + " The delivery-tag MUST be unique amongst all deliveries that"
+                                            + " could be considered unsettled by either end of the link.",
+                                            transfer.getDeliveryTag()));
+        }
         return error;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java
new file mode 100644
index 0000000..79649b3
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/ErrorCarryingFrameBody.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.type;
+
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+
+public interface ErrorCarryingFrameBody extends FrameBody
+{
+    Error getError();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java
index 55f700c..5fcee18 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Close.java
@@ -25,9 +25,9 @@ package org.apache.qpid.server.protocol.v1_0.type.transport;
 
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 
-public class Close implements FrameBody
+public class Close implements ErrorCarryingFrameBody
 {
 
     @CompositeTypeField

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java
index 02fb1d8..bccc719 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Detach.java
@@ -25,10 +25,10 @@ package org.apache.qpid.server.protocol.v1_0.type.transport;
 
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
-public class Detach implements FrameBody
+public class Detach implements ErrorCarryingFrameBody
 {
 
     @CompositeTypeField(mandatory = true)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java
index ca8b375..6c60f1b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/End.java
@@ -25,9 +25,9 @@ package org.apache.qpid.server.protocol.v1_0.type.transport;
 
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 
-public class End implements FrameBody
+public class End implements ErrorCarryingFrameBody
 {
 
     @CompositeTypeField

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0598f7eb/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 6ff5d5f..0d375e4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -25,7 +25,10 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
@@ -39,6 +42,8 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -70,7 +75,7 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
 
 public class TransferTest extends ProtocolTestBase
 {
-    public static final String TEST_MESSAGE_DATA = "foo";
+    private static final String TEST_MESSAGE_DATA = "foo";
     private InetSocketAddress _brokerAddress;
     private String _originalMmsMessageStorePersistence;
 
@@ -650,4 +655,63 @@ public class TransferTest extends ProtocolTestBase
 
         }
     }
+
+    @Test
+    @SpecificationTest(section = "2.7.5",
+            description = "[delivery-tag] uniquely identifies the delivery attempt for a given message on this link.")
+    public void transfersWithDuplicateDeliveryTag() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(UTF_8));
+
+            Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+                       .attachRole(Role.SENDER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            Flow flow = interaction.getLatestResponse(Flow.class);
+            assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
+
+            interaction.transferDeliveryId(UnsignedInteger.ZERO)
+                       .transferDeliveryTag(deliveryTag)
+                       .transferPayloadData("test")
+                       .transfer()
+
+                       .transferDeliveryTag(deliveryTag)
+                       .transferDeliveryId(UnsignedInteger.ONE)
+                       .transferPayloadData("test2")
+                       .transfer();
+
+            do
+            {
+                interaction.consumeResponse();
+                Response<?> response = interaction.getLatestResponse();
+                assertThat(response, is(notNullValue()));
+
+                Object body =  response.getBody();
+                if (body instanceof ErrorCarryingFrameBody)
+                {
+                    Error error = ((ErrorCarryingFrameBody) body).getError();
+                    assertThat(error, is(notNullValue()));
+                    break;
+                }
+                else if (body instanceof Disposition)
+                {
+                    Disposition disposition = (Disposition)body;
+                    assertThat(disposition.getSettled(), is(equalTo(false)));
+                    assertThat(disposition.getFirst(), is(not(equalTo(UnsignedInteger.ONE))));
+                    assertThat(disposition.getLast(), is(not(equalTo(UnsignedInteger.ONE))));
+                }
+                else if (!(body instanceof Flow))
+                {
+                    fail("Unexpected response " + body);
+                }
+            } while (true);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org