You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/27 16:38:23 UTC
[qpid-broker-j] branch 7.1.x updated: QPID-8323: [Broker-J] Make
sure that the same delivery tags can be used by different links on the same
session
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/7.1.x by this push:
new ec533b2 QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session
ec533b2 is described below
commit ec533b24177f03ce651fe5f113f6bbaaa8686458
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 27 16:46:27 2019 +0100
QPID-8323: [Broker-J] Make sure that the same delivery tags can be used by different links on the same session
(cherry picked from commit 2909380370a86dc0deb66bedae0245cfcb8274fb)
---
.../v1_0/AbstractReceivingLinkEndpoint.java | 2 +-
.../server/protocol/v1_0/SendingLinkEndpoint.java | 6 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 44 ++++++--
.../v1_0/StandardReceivingLinkEndpoint.java | 5 +
.../protocol/v1_0/delivery/DeliveryRegistry.java | 2 +-
.../v1_0/delivery/DeliveryRegistryImpl.java | 12 +-
.../protocol/v1_0/delivery/UnsettledDelivery.java | 24 ++++
.../v1_0/delivery/DeliveryRegistryImplTest.java | 123 +++++++++++++++++++++
.../v1_0/delivery/UnsettledDeliveryTest.java | 99 +++++++++++++++++
.../protocol/v1_0/messaging/TransferTest.java | 62 ++++++++++-
10 files changed, 353 insertions(+), 26 deletions(-)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 3b922e4..f346dbe 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -341,7 +341,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
if (outcomeUpdate || settled)
{
- getSession().updateDisposition(getRole(), unsettledKeys, state, settled);
+ getSession().updateDisposition(this, deliveryTags, state, settled);
}
if (settled)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 2fb7421..5081d14 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -24,7 +24,6 @@ package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -50,10 +49,7 @@ import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
-import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -648,7 +644,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
if (settled && (_unsettled.remove(deliveryTag) != null))
{
- getSession().updateDisposition(getRole(), deliveryTag, state, settled);
+ getSession().updateDisposition(this, deliveryTag, state, settled);
}
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index cd8eb83..931cf95 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -248,32 +248,54 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
send(disposition);
}
- void updateDisposition(final Role role,
+ void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
final Binary deliveryTag,
final DeliveryState state,
final boolean settled)
{
- final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
- UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag);
+ final UnsignedInteger deliveryId = getDeliveryId(deliveryTag, linkEndpoint);
+ updateDisposition(linkEndpoint.getRole(), deliveryId, deliveryId, state, settled);
+ }
+
+ private UnsignedInteger getDeliveryId(final DeliveryRegistry deliveryRegistry,
+ final Binary deliveryTag,
+ final LinkEndpoint<?, ?> linkEndpoint)
+ {
+ final UnsignedInteger deliveryId = deliveryRegistry.getDeliveryId(deliveryTag, linkEndpoint);
if (deliveryId == null)
{
throw new ConnectionScopedRuntimeException(String.format(
"Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
}
- updateDisposition(role, deliveryId, deliveryId, state, settled);
+ return deliveryId;
}
- void updateDisposition(final Role role,
+ private SortedSet<UnsignedInteger> getDeliveryIds(final Set<Binary> deliveryTags, final LinkEndpoint<?, ?> linkEndpoint)
+ {
+ final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+ return deliveryTags.stream()
+ .map(deliveryTag -> getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint))
+ .collect(Collectors.toCollection(TreeSet::new));
+ }
+
+ private UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
+ {
+ final DeliveryRegistry deliveryRegistry = getDeliveryRegistry(linkEndpoint.getRole());
+ return getDeliveryId(deliveryRegistry, deliveryTag, linkEndpoint);
+ }
+
+ private DeliveryRegistry getDeliveryRegistry(final Role role)
+ {
+ return role == Role.RECEIVER ? getIncomingDeliveryRegistry() : getOutgoingDeliveryRegistry();
+ }
+
+ void updateDisposition(final LinkEndpoint<?,?> linkEndpoint,
final Set<Binary> deliveryTags,
final DeliveryState state,
final boolean settled)
{
- final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
- SortedSet<UnsignedInteger> deliveryIds = deliveryTags.stream()
- .map(deliveryRegistry::getDeliveryIdByTag)
- .collect(Collectors.toCollection(TreeSet::new));
-
- final Iterator<UnsignedInteger> iterator = deliveryIds.iterator();
+ final Role role = linkEndpoint.getRole();
+ final Iterator<UnsignedInteger> iterator = getDeliveryIds(deliveryTags, linkEndpoint).iterator();
if (iterator.hasNext())
{
UnsignedInteger begin = iterator.next();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 9be4dcc..bc7d4b0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -32,7 +32,10 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
@@ -46,6 +49,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageFormat;
import org.apache.qpid.server.protocol.MessageFormatRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorRuntimeException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -67,6 +71,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.AsyncCommand;
import org.apache.qpid.server.txn.AutoCommitTransaction;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
index a018824..3127653 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
@@ -30,6 +30,6 @@ public interface DeliveryRegistry
void removeDelivery(UnsignedInteger deliveryId);
UnsettledDelivery getDelivery(UnsignedInteger deliveryId);
void removeDeliveriesForLinkEndpoint(LinkEndpoint<?, ?> linkEndpoint);
- UnsignedInteger getDeliveryIdByTag(Binary deliveryTag);
+ UnsignedInteger getDeliveryId(Binary deliveryTag, LinkEndpoint<?, ?> linkEndpoint);
int size();
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
index bf7fe56..3e264fc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
@@ -31,13 +31,13 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
public class DeliveryRegistryImpl implements DeliveryRegistry
{
private final Map<UnsignedInteger, UnsettledDelivery> _deliveries = new ConcurrentHashMap<>();
- private final Map<Binary, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
+ private final Map<UnsettledDelivery, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
@Override
public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery)
{
_deliveries.put(deliveryId, unsettledDelivery);
- _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId);
+ _deliveryIds.put(unsettledDelivery, deliveryId);
}
@Override
@@ -46,7 +46,7 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId);
if (unsettledDelivery != null)
{
- _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+ _deliveryIds.remove(unsettledDelivery);
}
}
@@ -66,15 +66,15 @@ public class DeliveryRegistryImpl implements DeliveryRegistry
if (unsettledDelivery.getLinkEndpoint() == linkEndpoint)
{
iterator.remove();
- _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+ _deliveryIds.remove(unsettledDelivery);
}
}
}
@Override
- public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag)
+ public UnsignedInteger getDeliveryId(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
{
- return _deliveryIds.get(deliveryTag);
+ return _deliveryIds.get(new UnsettledDelivery(deliveryTag, linkEndpoint));
}
@Override
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
index 48d2ab1..d581142 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v1_0.delivery;
+import java.util.Objects;
+
import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -43,4 +45,26 @@ public class UnsettledDelivery
{
return _linkEndpoint;
}
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+ final UnsettledDelivery that = (UnsettledDelivery) o;
+ return Objects.equals(_deliveryTag, that._deliveryTag) &&
+ Objects.equals(_linkEndpoint, that._linkEndpoint);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(_deliveryTag, _linkEndpoint);
+ }
}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
new file mode 100644
index 0000000..54e7fb4
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImplTest.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class DeliveryRegistryImplTest extends UnitTestBase
+{
+ private static final UnsignedInteger DELIVERY_ID = UnsignedInteger.ZERO;
+ private static final UnsignedInteger DELIVERY_ID_2 = UnsignedInteger.ONE;
+ private static final Binary DELIVERY_TAG = new Binary(new byte[]{(byte) 32, (byte) 33});
+ private static final Binary DELIVERY_TAG_2 = new Binary(new byte[]{(byte) 32});
+
+ private DeliveryRegistryImpl _registry;
+ private UnsettledDelivery _unsettledDelivery;
+
+ @Before
+ public void setUp()
+ {
+ _registry = new DeliveryRegistryImpl();
+ _unsettledDelivery = new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class));
+ }
+
+ @Test
+ public void addDelivery()
+ {
+ assertThat(_registry.size(), is(equalTo(0)));
+
+ _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+ assertThat(_registry.size(), is(equalTo(1)));
+ }
+
+ @Test
+ public void removeDelivery()
+ {
+ _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+ assertThat(_registry.size(), is(equalTo(1)));
+ _registry.removeDelivery(DELIVERY_ID);
+ assertThat(_registry.size(), is(equalTo(0)));
+ assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(nullValue()));
+ }
+
+ @Test
+ public void getDelivery()
+ {
+ _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+ assertThat(_registry.size(), is(equalTo(1)));
+ final UnsettledDelivery expected =
+ new UnsettledDelivery(_unsettledDelivery.getDeliveryTag(), _unsettledDelivery.getLinkEndpoint());
+ assertThat(_registry.getDelivery(UnsignedInteger.ZERO), is(equalTo(expected)));
+ }
+
+ @Test
+ public void removeDeliveriesForLinkEndpoint()
+ {
+ _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+ _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG_2, _unsettledDelivery.getLinkEndpoint()));
+ _registry.addDelivery(UnsignedInteger.valueOf(2), new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)));
+
+ assertThat(_registry.size(), is(equalTo(3)));
+
+ _registry.removeDeliveriesForLinkEndpoint(_unsettledDelivery.getLinkEndpoint());
+
+ assertThat(_registry.size(), is(equalTo(1)));
+ }
+
+ @Test
+ public void getDeliveryId()
+ {
+ _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+ _registry.addDelivery(DELIVERY_ID_2, new UnsettledDelivery(DELIVERY_TAG, mock(LinkEndpoint.class)));
+
+ final UnsignedInteger deliveryId = _registry.getDeliveryId(DELIVERY_TAG, _unsettledDelivery.getLinkEndpoint());
+
+ assertThat(deliveryId, is(equalTo(DELIVERY_ID)));
+ }
+
+ @Test
+ public void size()
+ {
+ assertThat(_registry.size(), is(equalTo(0)));
+
+ _registry.addDelivery(DELIVERY_ID, _unsettledDelivery);
+
+ assertThat(_registry.size(), is(equalTo(1)));
+
+ _registry.removeDelivery(DELIVERY_ID);
+
+ assertThat(_registry.size(), is(equalTo(0)));
+ }
+}
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
new file mode 100644
index 0000000..51195c6
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDeliveryTest.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import java.util.Objects;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class UnsettledDeliveryTest extends UnitTestBase
+{
+
+ private static final byte[] DATA = new byte[]{(byte) 32, (byte) 33, (byte) 34};
+ private Binary _deliveryTag;
+ private LinkEndpoint<?, ?> _linkEndpoint;
+ private UnsettledDelivery _unsettledDelivery;
+
+ @Before
+ public void setUp()
+ {
+ _deliveryTag = new Binary(DATA);
+ _linkEndpoint = mock(LinkEndpoint.class);
+ _unsettledDelivery = new UnsettledDelivery(_deliveryTag, _linkEndpoint);
+ }
+
+ @Test
+ public void testGetDeliveryTag()
+ {
+ assertThat(_unsettledDelivery.getDeliveryTag(), is(equalTo(_deliveryTag)));
+ }
+
+ @Test
+ public void testGetLinkEndpoint()
+ {
+ assertThat(_unsettledDelivery.getLinkEndpoint(), is(equalTo(_linkEndpoint)));
+ }
+
+ @Test
+ public void testEqualsToNewUnsettledDeliveryWithTheSameFields()
+ {
+ assertThat(_unsettledDelivery.equals(new UnsettledDelivery(_deliveryTag, _linkEndpoint)), is(equalTo(true)));
+ }
+
+ @Test
+ public void testEqualsToNewUnsettledDeliveryWithEqualsFields()
+ {
+ assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(DATA), _linkEndpoint)),
+ is(equalTo(true)));
+ }
+
+ @Test
+ public void testNotEqualsWhenDeliveryTagIsDifferent()
+ {
+ assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}),
+ _linkEndpoint)), is(equalTo(false)));
+ }
+
+ @Test
+ public void testNotEqualsWhenLinkEndpointIsDifferent()
+ {
+ final LinkEndpoint<?, ?> linkEndpoint = mock(LinkEndpoint.class);
+ assertThat(_unsettledDelivery.equals(new UnsettledDelivery(new Binary(new byte[]{(byte) 32, (byte) 33}),
+ linkEndpoint)), is(equalTo(false)));
+ }
+
+ @Test
+ public void testHashCode()
+ {
+ int expected = Objects.hash(_deliveryTag, _linkEndpoint);
+ assertThat(_unsettledDelivery.hashCode(), is(equalTo(expected)));
+ }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 7d1b1b6..da7f82a 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.tests.protocol.v1_0.messaging;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -31,8 +32,8 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
@@ -79,7 +80,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.util.SystemUtils;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -196,6 +196,64 @@ public class TransferTest extends BrokerAdminUsingTestBase
}
@Test
+ @SpecificationTest(section = "2.6.12 Transferring A Message",
+ description = "The delivery-tag MUST be unique amongst all deliveries"
+ + " that could be considered unsettled by either end of the link.")
+ public void transferMessagesWithTheSameDeliveryTagOnSeparateLinksBelongingToTheSameSession() throws Exception
+ {
+ try (final FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger link1Handle = UnsignedInteger.ONE;
+ final UnsignedInteger link2Handle = UnsignedInteger.valueOf(2);
+ final Binary deliveryTag = new Binary("deliveryTag".getBytes(StandardCharsets.UTF_8));
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+
+ .attachName("test1")
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attachHandle(link1Handle)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .attachName("test2")
+ .attachHandle(link2Handle)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(link1Handle)
+ .transferPayloadData("testData")
+ .transferDeliveryTag(deliveryTag)
+ .transferDeliveryId(UnsignedInteger.ZERO)
+ .transfer()
+ .transferHandle(link2Handle)
+ .transferDeliveryId(UnsignedInteger.ONE)
+ .transferPayloadData("testData2")
+ .transferDeliveryTag(deliveryTag)
+ .transfer();
+
+ final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+ final UnsignedInteger first = disposition1.getFirst();
+ final UnsignedInteger last = disposition1.getLast();
+
+ assertThat(first, anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+ assertThat(last, anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+
+ if (last == null || first.equals(last))
+ {
+ final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+ assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+ assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
+ assertThat(disposition2.getFirst(), is(not(equalTo(first))));
+ }
+ }
+ }
+
+ @Test
@SpecificationTest(section = "2.7.5",
description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
public void transferReceiverSettleModeFirst() throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org