You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/27 16:22:48 UTC

[qpid-broker-j] 02/02: QPID-8328: [AMQP 1.0] Add link stealing tests

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 41edc4b476f5c9b0483d640392ed79d0d779a4d5
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 27 17:07:55 2019 +0100

    QPID-8328: [AMQP 1.0] Add link stealing tests
---
 .../qpid/tests/protocol/v1_0/Interaction.java      |  19 +++
 .../v1_0/transport/link/LinkStealingTest.java      | 136 +++++++++++++++++++++
 2 files changed, 155 insertions(+)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 5570966..ac30068 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -1144,4 +1144,23 @@ public class Interaction extends AbstractInteraction<Interaction>
         sendPerformative(EMPTY_FRAME, UnsignedShort.ZERO);
         return this;
     }
+
+    public <T> T consume(final Class<T> expected, final Class<?>... ignore)
+            throws Exception
+    {
+        final Class<?>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1);
+        expectedResponses[ignore.length] = expected;
+
+        T completed = null;
+        do
+        {
+            Response<?> response = consumeResponse(expectedResponses).getLatestResponse();
+            if (expected.isAssignableFrom(response.getBody().getClass()))
+            {
+                completed = (T) response.getBody();
+            }
+        }
+        while (completed == null);
+        return completed;
+    }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
new file mode 100644
index 0000000..260a3bc
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class LinkStealingTest extends BrokerAdminUsingTestBase
+{
+    @Test
+    @SpecificationTest(section = "2.6.1. Naming a link",
+                       description = "Consequently, a link can only be active in one connection at a time."
+                                     + " If an attempt is made to attach the link subsequently when it is not suspended,"
+                                     + " then the link can be ’stolen’, i.e., the second attach succeeds and the first"
+                                     + " attach MUST then be closed with a link error of stolen. This behavior ensures"
+                                     + " that in the event of a connection failure occurring and being noticed"
+                                     + " by one party, that re-establishment has the desired effect.")
+    @Ignore("QPID-8328: Broker erroneously ends the session with internal error")
+    public void subsequentAttachOnTheSameSession() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = interaction
+                    .negotiateProtocol().consumeResponse()
+                    .open().consumeResponse(Open.class)
+                    .begin().consumeResponse(Begin.class)
+                    .attachRole(Role.SENDER)
+                    .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+                    .attachHandle(UnsignedInteger.ZERO)
+                    .attach().consumeResponse()
+                    .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(), is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+            assertThat(responseAttach.getTarget(), is(notNullValue()));
+            assertThat(responseAttach.getSource(), is(notNullValue()));
+
+            Detach stolenDetach = interaction.consumeResponse(Flow.class)
+                                             .attachHandle(UnsignedInteger.ONE)
+                                             .attach()
+                                             .consume(Detach.class, Attach.class, Flow.class);
+
+            assertThat(stolenDetach.getHandle().longValue(), is(equalTo(responseAttach.getHandle().longValue())));
+            assertThat(stolenDetach.getError(), is(notNullValue()));
+            assertThat(stolenDetach.getError().getCondition(), is(equalTo(LinkError.STOLEN)));
+        }
+    }
+
+
+
+    @Test
+    @SpecificationTest(section = "2.6.1. Naming a link", description = "")
+    public void subsequentAttachOnDifferentSessions() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final String linkName = "test";
+            final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.SENDER)
+                                                     .attachName(linkName)
+                                                     .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+
+            interaction.consumeResponse(Flow.class);
+
+            final Detach stolenDetach = interaction.sessionChannel(UnsignedShort.valueOf(2))
+                                                   .begin().consumeResponse(Begin.class)
+                                                   .attachRole(Role.SENDER)
+                                                   .attachName(linkName)
+                                                   .attachInitialDeliveryCount(UnsignedInteger.ZERO)
+                                                   .attach()
+                                                   .consume(Detach.class, Attach.class, Flow.class, Disposition.class);
+
+            assertThat(stolenDetach.getHandle().longValue(), is(equalTo(responseAttach.getHandle().longValue())));
+            assertThat(stolenDetach.getError(), is(notNullValue()));
+            assertThat(stolenDetach.getError().getCondition(), is(equalTo(LinkError.STOLEN)));
+
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org