You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/27 16:22:46 UTC

[qpid-broker-j] branch master updated (4a2ea8a -> 41edc4b)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 4a2ea8a  QPID-8316: Add licence header into new test class
     new 2909380  QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session
     new 41edc4b  QPID-8328: [AMQP 1.0] Add link stealing tests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../v1_0/AbstractReceivingLinkEndpoint.java        |   2 +-
 .../server/protocol/v1_0/SendingLinkEndpoint.java  |   6 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java     |  44 +++++--
 .../v1_0/StandardReceivingLinkEndpoint.java        |   5 +
 .../protocol/v1_0/delivery/DeliveryRegistry.java   |   2 +-
 .../v1_0/delivery/DeliveryRegistryImpl.java        |  12 +-
 .../protocol/v1_0/delivery/UnsettledDelivery.java  |  24 ++++
 .../v1_0/delivery/DeliveryRegistryImplTest.java    | 123 +++++++++++++++++++
 .../v1_0/delivery/UnsettledDeliveryTest.java       |  99 +++++++++++++++
 .../qpid/tests/protocol/v1_0/Interaction.java      |  19 +++
 .../protocol/v1_0/messaging/TransferTest.java      |  62 +++++++++-
 .../v1_0/transport/link/LinkStealingTest.java      | 136 +++++++++++++++++++++
 12 files changed, 508 insertions(+), 26 deletions(-)
 create mode 100644 broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
 create mode 100644 broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
 create mode 100644 systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/02: QPID-8328: [AMQP 1.0] Add link stealing tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 41edc4b476f5c9b0483d640392ed79d0d779a4d5
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 27 17:07:55 2019 +0100

    QPID-8328: [AMQP 1.0] Add link stealing tests
---
 .../qpid/tests/protocol/v1_0/Interaction.java      |  19 +++
 .../v1_0/transport/link/LinkStealingTest.java      | 136 +++++++++++++++++++++
 2 files changed, 155 insertions(+)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 5570966..ac30068 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -1144,4 +1144,23 @@ public class Interaction extends AbstractInteraction<Interaction>
         sendPerformative(EMPTY_FRAME, UnsignedShort.ZERO);
         return this;
     }
+
+    public <T> T consume(final Class<T> expected, final Class<?>... ignore)
+            throws Exception
+    {
+        final Class<?>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
+        expectedResponses[ignore.length] = expected;
+
+        T completed = null;
+        do
+        {
+            Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
+            if (expected.isAssignableFrom(response.getBody().getClass()))
+            {
+                completed = (T) response.getBody();
+            }
+        }
+        while (completed == null);
+        return completed;
+    }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
new file mode 100644
index 0000000..260a3bc
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class LinkStealingTest extends BrokerAdminUsingTestBase
+{
+    @Test
+    @SpecificationTest(section = "2.6.1. Naming a link",
+                       description = "Consequently, a link can only be active in one connection at a time."
+                                     + " If an attempt is made to attach the link subsequently when it is not suspended,"
+                                     + " then the link can be ’stolen’, i.e., the second attach succeeds and the first"
+                                     + " attach MUST then be closed with a link error of stolen. This behavior ensures"
+                                     + " that in the event of a connection failure occurring and being noticed"
+                                     + " by one party, that re-establishment has the desired effect.")
+    @Ignore("QPID-8328: Broker erroneously ends the session with internal error")
+    public void subsequentAttachOnTheSameSession() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = interaction
+                    .negotiateProtocol().consumeResponse()
+                    .open().consumeResponse(Open.class)
+                    .begin().consumeResponse(Begin.class)
+                    .attachRole(Role.SENDER)
+                    .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+                    .attachHandle(UnsignedInteger.ZERO)
+                    .attach().consumeResponse()
+                    .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(), is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+            assertThat(responseAttach.getTarget(), is(notNullValue()));
+            assertThat(responseAttach.getSource(), is(notNullValue()));
+
+            Detach stolenDetach = interaction.consumeResponse(Flow.class)
+                                             .attachHandle(UnsignedInteger.ONE)
+                                             .attach()
+                                             .consume(Detach.class, Attach.class, Flow.class);
+
+            assertThat(stolenDetach.getHandle().longValue(), is(equalTo(responseAttach.getHandle().longValue())));
+            assertThat(stolenDetach.getError(), is(notNullValue()));
+            assertThat(stolenDetach.getError().getCondition(), is(equalTo(LinkError.STOLEN)));
+        }
+    }
+
+
+
+    @Test
+    @SpecificationTest(section = "2.6.1. Naming a link", description = "")
+    public void subsequentAttachOnDifferentSessions() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final String linkName = "test";
+            final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.SENDER)
+                                                     .attachName(linkName)
+                                                     .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+
+            interaction.consumeResponse(Flow.class);
+
+            final Detach stolenDetach = interaction.sessionChannel(UnsignedShort.valueOf(2))
+                                                   .begin().consumeResponse(Begin.class)
+                                                   .attachRole(Role.SENDER)
+                                                   .attachName(linkName)
+                                                   .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+                                                   .attach()
+                                                   .consume(Detach.class, Attach.class, Flow.class, Disposition.class);
+
+            assertThat(stolenDetach.getHandle().longValue(), is(equalTo(responseAttach.getHandle().longValue())));
+            assertThat(stolenDetach.getError(), is(notNullValue()));
+            assertThat(stolenDetach.getError().getCondition(), is(equalTo(LinkError.STOLEN)));
+
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/02: QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 2909380370a86dc0deb66bedae0245cfcb8274fb
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 27 16:46:27 2019 +0100

    QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session
---
 .../v1_0/AbstractReceivingLinkEndpoint.java        |   2 +-
 .../server/protocol/v1_0/SendingLinkEndpoint.java  |   6 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java     |  44 ++++++--
 .../v1_0/StandardReceivingLinkEndpoint.java        |   5 +
 .../protocol/v1_0/delivery/DeliveryRegistry.java   |   2 +-
 .../v1_0/delivery/DeliveryRegistryImpl.java        |  12 +-
 .../protocol/v1_0/delivery/UnsettledDelivery.java  |  24 ++++
 .../v1_0/delivery/DeliveryRegistryImplTest.java    | 123 +++++++++++++++++++++
 .../v1_0/delivery/UnsettledDeliveryTest.java       |  99 +++++++++++++++++
 .../protocol/v1_0/messaging/TransferTest.java      |  62 ++++++++++-
 10 files changed, 353 insertions(+), 26 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 3b922e4..f346dbe 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -341,7 +341,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
 
             if (outcomeUpdate || settled)
             {
-                getSession().updateDisposition(getRole(), unsettledKeys, state, settled);
+                getSession().updateDisposition(this, deliveryTags, state, settled);
             }
 
             if (settled)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 2fb7421..5081d14 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -24,7 +24,6 @@ package org.apache.qpid.server.protocol.v1_0;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -50,10 +49,7 @@ import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -648,7 +644,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     {
         if (settled && (_unsettled.remove(deliveryTag) != null))
         {
-            getSession().updateDisposition(getRole(), deliveryTag, state, settled);
+            getSession().updateDisposition(this, deliveryTag, state, settled);
         }
     }
 
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index cd8eb83..931cf95 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -248,32 +248,54 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         send(disposition);
     }
 
-    void updateDisposition(final Role role,
+    void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
                            final Binary deliveryTag,
                            final DeliveryState state,
                            final boolean settled)
     {
-        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
-        UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag);
+        final UnsignedInteger deliveryId = getDeliveryId(deliveryTag, linkEndpoint);
+        updateDisposition(linkEndpoint.getRole(), deliveryId, deliveryId, state, settled);
+    }
+
+    private UnsignedInteger getDeliveryId(final DeliveryRegistry deliveryRegistry,
+                                          final Binary deliveryTag,
+                                          final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final UnsignedInteger deliveryId = deliveryRegistry.getDeliveryId(deliveryTag, linkEndpoint);
         if (deliveryId == null)
         {
             throw new ConnectionScopedRuntimeException(String.format(
                     "Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
         }
-        updateDisposition(role, deliveryId, deliveryId, state, settled);
+        return deliveryId;
     }
 
-    void updateDisposition(final Role role,
+    private SortedSet<UnsignedInteger> getDeliveryIds(final Set<Binary> deliveryTags, final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+        return deliveryTags.stream()
+                           .map(deliveryTag -> getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint))
+                           .collect(Collectors.toCollection(TreeSet::new));
+    }
+
+    private UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+        return getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint);
+    }
+
+    private DeliveryRegistry getDeliveryRegistry(final Role role)
+    {
+        return role == Role.RECEIVER ? getIncomingDeliveryRegistry() : getOutgoingDeliveryRegistry();
+    }
+
+    void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
                            final Set<Binary> deliveryTags,
                            final DeliveryState state,
                            final boolean settled)
     {
-        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
-        SortedSet<UnsignedInteger> deliveryIds = deliveryTags.stream()
-                                                             .map(deliveryRegistry::getDeliveryIdByTag)
-                                                             .collect(Collectors.toCollection(TreeSet::new));
-
-        final Iterator<UnsignedInteger> iterator = deliveryIds.iterator();
+        final Role role = linkEndpoint.getRole();
+        final Iterator<UnsignedInteger> iterator = getDeliveryIds(deliveryTags, linkEndpoint).iterator();
         if (iterator.hasNext())
         {
             UnsignedInteger begin = iterator.next();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 9be4dcc..bc7d4b0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,7 +32,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
@@ -46,6 +49,7 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -67,6 +71,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
index a018824..3127653 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
@@ -30,6 +30,6 @@ public interface DeliveryRegistry
     void removeDelivery(UnsignedInteger deliveryId);
     UnsettledDelivery getDelivery(UnsignedInteger deliveryId);
     void removeDeliveriesForLinkEndpoint(LinkEndpoint<?, ?> linkEndpoint);
-    UnsignedInteger getDeliveryIdByTag(Binary deliveryTag);
+    UnsignedInteger getDeliveryId(Binary deliveryTag, LinkEndpoint<?, ?> linkEndpoint);
     int size();
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
index bf7fe56..3e264fc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
@@ -31,13 +31,13 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 public class DeliveryRegistryImpl implements DeliveryRegistry
 {
     private final Map<UnsignedInteger, UnsettledDelivery> _deliveries = new ConcurrentHashMap<>();
-    private final Map<Binary, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
+    private final Map<UnsettledDelivery, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
 
     @Override
     public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery)
     {
         _deliveries.put(deliveryId, unsettledDelivery);
-        _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId);
+        _deliveryIds.put(unsettledDelivery, deliveryId);
     }
 
     @Override
@@ -46,7 +46,7 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
         UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId);
         if (unsettledDelivery != null)
         {
-            _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+            _deliveryIds.remove(unsettledDelivery);
         }
     }
 
@@ -66,15 +66,15 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
             if (unsettledDelivery.getLinkEndpoint() == linkEndpoint)
             {
                 iterator.remove();
-                _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+                _deliveryIds.remove(unsettledDelivery);
             }
         }
     }
 
     @Override
-    public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag)
+    public UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
     {
-        return _deliveryIds.get(deliveryTag);
+        return _deliveryIds.get(new UnsettledDelivery(deliveryTag, linkEndpoint));
     }
 
     @Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
index 48d2ab1..d581142 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v1_0.delivery;
 
+import java.util.Objects;
+
 import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 
@@ -43,4 +45,26 @@ public class UnsettledDelivery
     {
         return _linkEndpoint;
     }
+
+    @Override
+    public boolean equals(final Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        final UnsettledDelivery that = (UnsettledDelivery) o;
+        return Objects.equals(_deliveryTag, that._deliveryTag) &&
+               Objects.equals(_linkEndpoint, that._linkEndpoint);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(_deliveryTag, _linkEndpoint);
+    }
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
new file mode 100644
index 0000000..54e7fb4
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class DeliveryRegistryImplTest extends UnitTestBase
+{
+    private static final UnsignedInteger DELIVERY_ID = UnsignedInteger.ZERO;
+    private static final UnsignedInteger DELIVERY_ID_2 = UnsignedInteger.ONE;
+    private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 32, (byte) 33});
+    private static final Binary DELIVERY_TAG_2 = new Binary(new byte[]{(byte) 32});
+
+    private DeliveryRegistryImpl _registry;
+    private UnsettledDelivery _unsettledDelivery;
+
+    @Before
+    public void setUp()
+    {
+        _registry = new DeliveryRegistryImpl();
+        _unsettledDelivery = new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class));
+    }
+
+    @Test
+    public void addDelivery()
+    {
+        assertThat(_registry.size(), is(equalTo(0)));
+
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+    }
+
+    @Test
+    public void removeDelivery()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        assertThat(_registry.size(), is(equalTo(1)));
+        _registry.removeDelivery(DELIVERY_ID);
+        assertThat(_registry.size(), is(equalTo(0)));
+        assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(nullValue()));
+    }
+
+    @Test
+    public void getDelivery()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+        final UnsettledDelivery expected =
+                new UnsettledDelivery(_unsettledDelivery.getDeliveryTag(), _unsettledDelivery.getLinkEndpoint());
+        assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(equalTo(expected)));
+    }
+
+    @Test
+    public void removeDeliveriesForLinkEndpoint()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG_2, _unsettledDelivery.getLinkEndpoint()));
+        _registry.addDelivery(UnsignedInteger.valueOf(2), new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)));
+
+        assertThat(_registry.size(), is(equalTo(3)));
+
+        _registry.removeDeliveriesForLinkEndpoint(_unsettledDelivery.getLinkEndpoint());
+
+        assertThat(_registry.size(), is(equalTo(1)));
+    }
+
+    @Test
+    public void getDeliveryId()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)));
+
+        final UnsignedInteger deliveryId = _registry.getDeliveryId(DELIVERY_TAG, _unsettledDelivery.getLinkEndpoint());
+
+        assertThat(deliveryId, is(equalTo(DELIVERY_ID)));
+    }
+
+    @Test
+    public void size()
+    {
+        assertThat(_registry.size(), is(equalTo(0)));
+
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+
+        _registry.removeDelivery(DELIVERY_ID);
+
+        assertThat(_registry.size(), is(equalTo(0)));
+    }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
new file mode 100644
index 0000000..51195c6
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Objects;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class UnsettledDeliveryTest extends UnitTestBase
+{
+
+    private static final byte[] DATA = new byte[]{(byte) 32, (byte) 33, (byte) 34};
+    private Binary _deliveryTag;
+    private LinkEndpoint<?, ?> _linkEndpoint;
+    private UnsettledDelivery _unsettledDelivery;
+
+    @Before
+    public void setUp()
+    {
+        _deliveryTag = new Binary(DATA);
+        _linkEndpoint = mock(LinkEndpoint.class);
+        _unsettledDelivery = new UnsettledDelivery(_deliveryTag, _linkEndpoint);
+    }
+
+    @Test
+    public void testGetDeliveryTag()
+    {
+        assertThat(_unsettledDelivery.getDeliveryTag(), is(equalTo(_deliveryTag)));
+    }
+
+    @Test
+    public void testGetLinkEndpoint()
+    {
+        assertThat(_unsettledDelivery.getLinkEndpoint(), is(equalTo(_linkEndpoint)));
+    }
+
+    @Test
+    public void testEqualsToNewUnsettledDeliveryWithTheSameFields()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(_deliveryTag, _linkEndpoint)), is(equalTo(true)));
+    }
+
+    @Test
+    public void testEqualsToNewUnsettledDeliveryWithEqualsFields()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(DATA), _linkEndpoint)),
+                   is(equalTo(true)));
+    }
+
+    @Test
+    public void testNotEqualsWhenDeliveryTagIsDifferent()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}),
+                                                                   _linkEndpoint)), is(equalTo(false)));
+    }
+
+    @Test
+    public void testNotEqualsWhenLinkEndpointIsDifferent()
+    {
+        final LinkEndpoint<?, ?> linkEndpoint = mock(LinkEndpoint.class);
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}),
+                                                                   linkEndpoint)), is(equalTo(false)));
+    }
+
+    @Test
+    public void testHashCode()
+    {
+        int expected = Objects.hash(_deliveryTag, _linkEndpoint);
+        assertThat(_unsettledDelivery.hashCode(), is(equalTo(expected)));
+    }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 7d1b1b6..da7f82a 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -31,8 +32,8 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
@@ -79,7 +80,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.util.SystemUtils;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -196,6 +196,64 @@ public class TransferTest extends BrokerAdminUsingTestBase
     }
 
     @Test
+    @SpecificationTest(section = "2.6.12 Transferring A Message",
+            description = "The delivery-tag MUST be unique amongst all deliveries"
+                          + " that could be considered unsettled by either end of the link.")
+    public void transferMessagesWithTheSameDeliveryTagOnSeparateLinksBelongingToTheSameSession() throws Exception
+    {
+        try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger link1Handle = UnsignedInteger.ONE;
+            final UnsignedInteger link2Handle = UnsignedInteger.valueOf(2);
+            final Binary deliveryTag = new Binary("deliveryTag".getBytes(StandardCharsets.UTF_8));
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                                     .open().consumeResponse(Open.class)
+                                     .begin().consumeResponse(Begin.class)
+
+                                     .attachName("test1")
+                                     .attachRole(Role.SENDER)
+                                     .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                     .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                     .attachHandle(link1Handle)
+                                     .attach().consumeResponse(Attach.class)
+                                     .consumeResponse(Flow.class)
+
+                                     .attachName("test2")
+                                     .attachHandle(link2Handle)
+                                     .attach().consumeResponse(Attach.class)
+                                     .consumeResponse(Flow.class)
+
+                                     .transferHandle(link1Handle)
+                                     .transferPayloadData("testData")
+                                     .transferDeliveryTag(deliveryTag)
+                                     .transferDeliveryId(UnsignedInteger.ZERO)
+                                     .transfer()
+                                     .transferHandle(link2Handle)
+                                     .transferDeliveryId(UnsignedInteger.ONE)
+                                     .transferPayloadData("testData2")
+                                     .transferDeliveryTag(deliveryTag)
+                                     .transfer();
+
+            final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final UnsignedInteger first = disposition1.getFirst();
+            final UnsignedInteger last = disposition1.getLast();
+
+            assertThat(first, anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+            assertThat(last, anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+
+            if (last == null || first.equals(last))
+            {
+                final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+                assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+                assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+                assertThat(disposition2.getFirst(), is(not(equalTo(first))));
+            }
+        }
+    }
+
+    @Test
     @SpecificationTest(section = "2.7.5",
             description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
     public void transferReceiverSettleModeFirst() throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org