You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/27 16:22:48 UTC
[qpid-broker-j] 02/02: QPID-8328: [AMQP 1.0] Add link stealing tests
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 41edc4b476f5c9b0483d640392ed79d0d779a4d5
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 27 17:07:55 2019 +0100
QPID-8328: [AMQP 1.0] Add link stealing tests
---
.../qpid/tests/protocol/v1_0/Interaction.java | 19 +++
.../v1_0/transport/link/LinkStealingTest.java | 136 +++++++++++++++++++++
2 files changed, 155 insertions(+)
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 5570966..ac30068 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -1144,4 +1144,23 @@ public class Interaction extends AbstractInteraction<Interaction>
sendPerformative(EMPTY_FRAME, UnsignedShort.ZERO);
return this;
}
+
+ public <T> T consume(final Class<T> expected, final Class<?>... ignore)
+ throws Exception
+ {
+ final Class<?>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
+ expectedResponses[ignore.length] = expected;
+
+ T completed = null;
+ do
+ {
+ Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
+ if (expected.isAssignableFrom(response.getBody().getClass()))
+ {
+ completed = (T) response.getBody();
+ }
+ }
+ while (completed == null);
+ return completed;
+ }
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
new file mode 100644
index 0000000..260a3bc
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class LinkStealingTest extends BrokerAdminUsingTestBase
+{
+ @Test
+ @SpecificationTest(section = "2.6.1. Naming a link",
+ description = "Consequently, a link can only be active in one connection at a time."
+ + " If an attempt is made to attach the link subsequently when it is not suspended,"
+ + " then the link can be ’stolen’, i.e., the second attach succeeds and the first"
+ + " attach MUST then be closed with a link error of stolen. This behavior ensures"
+ + " that in the event of a connection failure occurring and being noticed"
+ + " by one party, that re-establishment has the desired effect.")
+ @Ignore("QPID-8328: Broker erroneously ends the session with internal error")
+ public void subsequentAttachOnTheSameSession() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final Attach responseAttach = interaction
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+ .attachHandle(UnsignedInteger.ZERO)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach.getName(), is(notNullValue()));
+ assertThat(responseAttach.getHandle().longValue(), is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+ assertThat(responseAttach.getTarget(), is(notNullValue()));
+ assertThat(responseAttach.getSource(), is(notNullValue()));
+
+ Detach stolenDetach = interaction.consumeResponse(Flow.class)
+ .attachHandle(UnsignedInteger.ONE)
+ .attach()
+ .consume(Detach.class, Attach.class, Flow.class);
+
+ assertThat(stolenDetach.getHandle().longValue(), is(equalTo(responseAttach.getHandle().longValue())));
+ assertThat(stolenDetach.getError(), is(notNullValue()));
+ assertThat(stolenDetach.getError().getCondition(), is(equalTo(LinkError.STOLEN)));
+ }
+ }
+
+
+
+ @Test
+ @SpecificationTest(section = "2.6.1. Naming a link", description = "")
+ public void subsequentAttachOnDifferentSessions() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final String linkName = "test";
+ final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachName(linkName)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach.getName(), is(notNullValue()));
+ assertThat(responseAttach.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+
+ interaction.consumeResponse(Flow.class);
+
+ final Detach stolenDetach = interaction.sessionChannel(UnsignedShort.valueOf(2))
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachName(linkName)
+ .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+ .attach()
+ .consume(Detach.class, Attach.class, Flow.class, Disposition.class);
+
+ assertThat(stolenDetach.getHandle().longValue(), is(equalTo(responseAttach.getHandle().longValue())));
+ assertThat(stolenDetach.getError(), is(notNullValue()));
+ assertThat(stolenDetach.getError().getCondition(), is(equalTo(LinkError.STOLEN)));
+
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org