You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/27 20:08:24 UTC

[qpid-broker-j] branch 7.0.x updated: QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/7.0.x by this push:
     new d082d9a  QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session
d082d9a is described below

commit d082d9a81183f33a7068185877c556849ae97312
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 27 16:46:27 2019 +0100

    QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session
    
    (cherry picked from commit 2909380370a86dc0deb66bedae0245cfcb8274fb)
---
 .../v1_0/AbstractReceivingLinkEndpoint.java        |   2 +-
 .../server/protocol/v1_0/SendingLinkEndpoint.java  |   2 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java     |  44 ++++++--
 .../v1_0/StandardReceivingLinkEndpoint.java        |   5 +
 .../protocol/v1_0/delivery/DeliveryRegistry.java   |   2 +-
 .../v1_0/delivery/DeliveryRegistryImpl.java        |  12 +--
 .../protocol/v1_0/delivery/UnsettledDelivery.java  |  24 +++++
 .../v1_0/delivery/DeliveryRegistryImplTest.java    | 114 +++++++++++++++++++++
 .../v1_0/delivery/UnsettledDeliveryTest.java       |  89 ++++++++++++++++
 .../protocol/v1_0/messaging/TransferTest.java      |  60 +++++++++++
 10 files changed, 334 insertions(+), 20 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 0872eae..c6fa7ce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -336,7 +336,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
 
             if (outcomeUpdate || settled)
             {
-                getSession().updateDisposition(getRole(), unsettledKeys, state, settled);
+                getSession().updateDisposition(this, deliveryTags, state, settled);
             }
 
             if (settled)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index a506125..5ad5d42 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -632,7 +632,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     {
         if (settled && (_unsettled.remove(deliveryTag) != null))
         {
-            getSession().updateDisposition(getRole(), deliveryTag, state, settled);
+            getSession().updateDisposition(this, deliveryTag, state, settled);
         }
     }
 
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index febfee8..f4f0857 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -248,32 +248,54 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         send(disposition);
     }
 
-    void updateDisposition(final Role role,
+    void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
                            final Binary deliveryTag,
                            final DeliveryState state,
                            final boolean settled)
     {
-        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
-        UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag);
+        final UnsignedInteger deliveryId = getDeliveryId(deliveryTag, linkEndpoint);
+        updateDisposition(linkEndpoint.getRole(), deliveryId, deliveryId, state, settled);
+    }
+
+    private UnsignedInteger getDeliveryId(final DeliveryRegistry deliveryRegistry,
+                                          final Binary deliveryTag,
+                                          final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final UnsignedInteger deliveryId = deliveryRegistry.getDeliveryId(deliveryTag, linkEndpoint);
         if (deliveryId == null)
         {
             throw new ConnectionScopedRuntimeException(String.format(
                     "Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
         }
-        updateDisposition(role, deliveryId, deliveryId, state, settled);
+        return deliveryId;
     }
 
-    void updateDisposition(final Role role,
+    private SortedSet<UnsignedInteger> getDeliveryIds(final Set<Binary> deliveryTags, final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+        return deliveryTags.stream()
+                           .map(deliveryTag -> getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint))
+                           .collect(Collectors.toCollection(TreeSet::new));
+    }
+
+    private UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+        return getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint);
+    }
+
+    private DeliveryRegistry getDeliveryRegistry(final Role role)
+    {
+        return role == Role.RECEIVER ? getIncomingDeliveryRegistry() : getOutgoingDeliveryRegistry();
+    }
+
+    void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
                            final Set<Binary> deliveryTags,
                            final DeliveryState state,
                            final boolean settled)
     {
-        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
-        SortedSet<UnsignedInteger> deliveryIds = deliveryTags.stream()
-                                                             .map(deliveryRegistry::getDeliveryIdByTag)
-                                                             .collect(Collectors.toCollection(TreeSet::new));
-
-        final Iterator<UnsignedInteger> iterator = deliveryIds.iterator();
+        final Role role = linkEndpoint.getRole();
+        final Iterator<UnsignedInteger> iterator = getDeliveryIds(deliveryTags, linkEndpoint).iterator();
         if (iterator.hasNext())
         {
             UnsignedInteger begin = iterator.next();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index cc689b9..b08d35e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,7 +32,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
@@ -46,6 +49,7 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageFormat;
 import org.apache.qpid.server.protocol.MessageFormatRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -67,6 +71,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
index a018824..3127653 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
@@ -30,6 +30,6 @@ public interface DeliveryRegistry
     void removeDelivery(UnsignedInteger deliveryId);
     UnsettledDelivery getDelivery(UnsignedInteger deliveryId);
     void removeDeliveriesForLinkEndpoint(LinkEndpoint<?, ?> linkEndpoint);
-    UnsignedInteger getDeliveryIdByTag(Binary deliveryTag);
+    UnsignedInteger getDeliveryId(Binary deliveryTag, LinkEndpoint<?, ?> linkEndpoint);
     int size();
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
index bf7fe56..3e264fc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
@@ -31,13 +31,13 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 public class DeliveryRegistryImpl implements DeliveryRegistry
 {
     private final Map<UnsignedInteger, UnsettledDelivery> _deliveries = new ConcurrentHashMap<>();
-    private final Map<Binary, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
+    private final Map<UnsettledDelivery, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
 
     @Override
     public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery)
     {
         _deliveries.put(deliveryId, unsettledDelivery);
-        _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId);
+        _deliveryIds.put(unsettledDelivery, deliveryId);
     }
 
     @Override
@@ -46,7 +46,7 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
         UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId);
         if (unsettledDelivery != null)
         {
-            _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+            _deliveryIds.remove(unsettledDelivery);
         }
     }
 
@@ -66,15 +66,15 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
             if (unsettledDelivery.getLinkEndpoint() == linkEndpoint)
             {
                 iterator.remove();
-                _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+                _deliveryIds.remove(unsettledDelivery);
             }
         }
     }
 
     @Override
-    public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag)
+    public UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
     {
-        return _deliveryIds.get(deliveryTag);
+        return _deliveryIds.get(new UnsettledDelivery(deliveryTag, linkEndpoint));
     }
 
     @Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
index 48d2ab1..d581142 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.protocol.v1_0.delivery;
 
+import java.util.Objects;
+
 import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 
@@ -43,4 +45,26 @@ public class UnsettledDelivery
     {
         return _linkEndpoint;
     }
+
+    @Override
+    public boolean equals(final Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        final UnsettledDelivery that = (UnsettledDelivery) o;
+        return Objects.equals(_deliveryTag, that._deliveryTag) &&
+               Objects.equals(_linkEndpoint, that._linkEndpoint);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(_deliveryTag, _linkEndpoint);
+    }
 }
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
new file mode 100644
index 0000000..e3a4380
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class DeliveryRegistryImplTest extends QpidTestCase
+{
+    private static final UnsignedInteger DELIVERY_ID = UnsignedInteger.ZERO;
+    private static final UnsignedInteger DELIVERY_ID_2 = UnsignedInteger.ONE;
+    private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 32, (byte) 33});
+    private static final Binary DELIVERY_TAG_2 = new Binary(new byte[]{(byte) 32});
+
+    private DeliveryRegistryImpl _registry;
+    private UnsettledDelivery _unsettledDelivery;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _registry = new DeliveryRegistryImpl();
+        _unsettledDelivery = new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class));
+    }
+
+    public void testAddDelivery()
+    {
+        assertThat(_registry.size(), is(equalTo(0)));
+
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+    }
+
+    public void testRemoveDelivery()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        assertThat(_registry.size(), is(equalTo(1)));
+        _registry.removeDelivery(DELIVERY_ID);
+        assertThat(_registry.size(), is(equalTo(0)));
+        assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(nullValue()));
+    }
+
+    public void testGetDelivery()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+        final UnsettledDelivery expected =
+                new UnsettledDelivery(_unsettledDelivery.getDeliveryTag(), _unsettledDelivery.getLinkEndpoint());
+        assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(equalTo(expected)));
+    }
+
+    public void testRemoveDeliveriesForLinkEndpoint()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG_2, _unsettledDelivery.getLinkEndpoint()));
+        _registry.addDelivery(UnsignedInteger.valueOf(2), new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)));
+
+        assertThat(_registry.size(), is(equalTo(3)));
+
+        _registry.removeDeliveriesForLinkEndpoint(_unsettledDelivery.getLinkEndpoint());
+
+        assertThat(_registry.size(), is(equalTo(1)));
+    }
+
+    public void testGetDeliveryId()
+    {
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+        _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)));
+
+        final UnsignedInteger deliveryId = _registry.getDeliveryId(DELIVERY_TAG, _unsettledDelivery.getLinkEndpoint());
+
+        assertThat(deliveryId, is(equalTo(DELIVERY_ID)));
+    }
+
+    public void testSize()
+    {
+        assertThat(_registry.size(), is(equalTo(0)));
+
+        _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+        assertThat(_registry.size(), is(equalTo(1)));
+
+        _registry.removeDelivery(DELIVERY_ID);
+
+        assertThat(_registry.size(), is(equalTo(0)));
+    }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
new file mode 100644
index 0000000..5f4d97c
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.Objects;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class UnsettledDeliveryTest extends QpidTestCase
+{
+
+    private static final byte[] DATA = new byte[]{(byte) 32, (byte) 33, (byte) 34};
+    private Binary _deliveryTag;
+    private LinkEndpoint<?, ?> _linkEndpoint;
+    private UnsettledDelivery _unsettledDelivery;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _deliveryTag = new Binary(DATA);
+        _linkEndpoint = mock(LinkEndpoint.class);
+        _unsettledDelivery = new UnsettledDelivery(_deliveryTag, _linkEndpoint);
+    }
+
+    public void testGetDeliveryTag()
+    {
+        assertThat(_unsettledDelivery.getDeliveryTag(), is(equalTo(_deliveryTag)));
+    }
+
+    public void testGetLinkEndpoint()
+    {
+        assertThat(_unsettledDelivery.getLinkEndpoint(), is(equalTo(_linkEndpoint)));
+    }
+
+    public void testEqualsToNewUnsettledDeliveryWithTheSameFields()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(_deliveryTag, _linkEndpoint)), is(equalTo(true)));
+    }
+
+    public void testEqualsToNewUnsettledDeliveryWithEqualsFields()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(DATA), _linkEndpoint)),
+                   is(equalTo(true)));
+    }
+
+    public void testNotEqualsWhenDeliveryTagIsDifferent()
+    {
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}),
+                                                                   _linkEndpoint)), is(equalTo(false)));
+    }
+
+    public void testNotEqualsWhenLinkEndpointIsDifferent()
+    {
+        final LinkEndpoint<?, ?> linkEndpoint = mock(LinkEndpoint.class);
+        assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}),
+                                                                   linkEndpoint)), is(equalTo(false)));
+    }
+
+    public void testHashCode()
+    {
+        int expected = Objects.hash(_deliveryTag, _linkEndpoint);
+        assertThat(_unsettledDelivery.hashCode(), is(equalTo(expected)));
+    }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 68d7179..70c479b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -31,6 +32,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
@@ -194,6 +196,64 @@ public class TransferTest extends BrokerAdminUsingTestBase
     }
 
     @Test
+    @SpecificationTest(section = "2.6.12 Transferring A Message",
+            description = "The delivery-tag MUST be unique amongst all deliveries"
+                          + " that could be considered unsettled by either end of the link.")
+    public void transferMessagesWithTheSameDeliveryTagOnSeparateLinksBelongingToTheSameSession() throws Exception
+    {
+        try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger link1Handle = UnsignedInteger.ONE;
+            final UnsignedInteger link2Handle = UnsignedInteger.valueOf(2);
+            final Binary deliveryTag = new Binary("deliveryTag".getBytes(StandardCharsets.UTF_8));
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                                     .open().consumeResponse(Open.class)
+                                     .begin().consumeResponse(Begin.class)
+
+                                     .attachName("test1")
+                                     .attachRole(Role.SENDER)
+                                     .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                     .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                     .attachHandle(link1Handle)
+                                     .attach().consumeResponse(Attach.class)
+                                     .consumeResponse(Flow.class)
+
+                                     .attachName("test2")
+                                     .attachHandle(link2Handle)
+                                     .attach().consumeResponse(Attach.class)
+                                     .consumeResponse(Flow.class)
+
+                                     .transferHandle(link1Handle)
+                                     .transferPayloadData("testData")
+                                     .transferDeliveryTag(deliveryTag)
+                                     .transferDeliveryId(UnsignedInteger.ZERO)
+                                     .transfer()
+                                     .transferHandle(link2Handle)
+                                     .transferDeliveryId(UnsignedInteger.ONE)
+                                     .transferPayloadData("testData2")
+                                     .transferDeliveryTag(deliveryTag)
+                                     .transfer();
+
+            final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final UnsignedInteger first = disposition1.getFirst();
+            final UnsignedInteger last = disposition1.getLast();
+
+            assertThat(first, anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+            assertThat(last, anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+
+            if (last == null || first.equals(last))
+            {
+                final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+                assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+                assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+                assertThat(disposition2.getFirst(), is(not(equalTo(first))));
+            }
+        }
+    }
+
+    @Test
     @SpecificationTest(section = "2.7.5",
             description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
     public void transferReceiverSettleModeFirst() throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org