You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/11/01 16:57:27 UTC

qpid-broker-j git commit: QPID-8201: [Broker-J][AMQP 1.0] Delete explicitly only subscription queue with source terminus expiry policy 'never'

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master b74b157c3 -> ce9cba805


QPID-8201: [Broker-J][AMQP 1.0] Delete explicitly only subscription queue with source terminus expiry policy 'never'


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ce9cba80
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ce9cba80
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ce9cba80

Branch: refs/heads/master
Commit: ce9cba805db6f97fb78682fe59bc2a48923f0d0a
Parents: b74b157
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Nov 1 16:56:53 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Nov 1 16:56:53 2018 +0000

----------------------------------------------------------------------
 .../protocol/v1_0/SendingLinkEndpoint.java      |   8 +-
 .../bindmapjms/TopicDestinationTest.java        | 341 +++++++++++++++++++
 2 files changed, 347 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ce9cba80/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 17bcfa1..8919413 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -833,11 +833,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
             || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
             || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
         {
+            cleanUpUnsettledDeliveries();
+        }
+
+        if (close)
+        {
             Error closingError = null;
             if (getDestination() instanceof ExchangeSendingDestination
-                && addressSpace instanceof QueueManagingVirtualHost)
+                && addressSpace instanceof QueueManagingVirtualHost && TerminusExpiryPolicy.NEVER.equals(expiryPolicy))
             {
-                cleanUpUnsettledDeliveries();
                 try
                 {
                     ((QueueManagingVirtualHost) addressSpace).removeSubscriptionQueue(

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ce9cba80/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java
new file mode 100644
index 0000000..dde3e51
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java
@@ -0,0 +1,341 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class TopicDestinationTest extends BrokerAdminUsingTestBase
+{
+    private static final Symbol TOPIC = Symbol.valueOf("topic");
+    private static final Symbol GLOBAL = Symbol.valueOf("global");
+    private static final Symbol SHARED = Symbol.valueOf("shared");
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "5.2",
+            description = "In order to facilitate these actions for the various Destination types that JMS supports,"
+                          + " type information SHOULD be conveyed when creating producer or consumer links"
+                          + " for the application by supplying a terminus capability for the particular"
+                          + " Destination type to which the client expects to attach."
+                          + " Destination Type = Topic, terminus capability (type) = topic")
+    public void nonSharedVolatileSubscriptionLinkAttachDetach() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setCapabilities(new Symbol[]{TOPIC});
+            source.setAddress("amq.topic");
+            source.setDurable(TerminusDurability.NONE);
+
+            final Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSource(source)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.SENDER));
+
+            final Detach responseDetach = interaction.detachClose(true)
+                                                     .detach()
+                                                     .consumeResponse()
+                                                     .getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(equalTo(Boolean.TRUE)));
+            assertThat(responseDetach.getError(), is(nullValue()));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    public void nonSharedDurableSubscriptionLinkAttachDetach() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setCapabilities(new Symbol[]{TOPIC});
+            source.setAddress("amq.topic");
+            source.setDurable(TerminusDurability.UNSETTLED_STATE);
+
+            final Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSource(source)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.SENDER));
+
+            final Detach responseDetach = interaction.detachClose(true)
+                                                     .detach()
+                                                     .consumeResponse()
+                                                     .getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(equalTo(Boolean.TRUE)));
+            assertThat(responseDetach.getError(), is(nullValue()));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    public void sharedGlobalVolatileSubscriptionLinkAttachDetach() throws Exception
+    {
+        String subscriptionName = "foo";
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setCapabilities(new Symbol[]{TOPIC, GLOBAL, SHARED});
+            source.setAddress("amq.topic");
+            source.setDurable(TerminusDurability.NONE);
+
+            final Interaction interaction = transport.newInteraction();
+            final Attach responseAttach1 = interaction.negotiateProtocol().consumeResponse()
+                                                      .open().consumeResponse(Open.class)
+                                                      .begin().consumeResponse(Begin.class)
+                                                      .attachName(subscriptionName + "|global-volatile")
+                                                      .attachHandle(UnsignedInteger.ZERO)
+                                                      .attachRole(Role.RECEIVER)
+                                                      .attachSource(source)
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+            assertThat(responseAttach1.getName(), is(notNullValue()));
+            assertThat(responseAttach1.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach1.getRole(), is(Role.SENDER));
+
+            final Attach responseAttach2 = interaction.attachName(subscriptionName + "|global-volatile2")
+                                                      .attachHandle(UnsignedInteger.ONE)
+                                                      .attachRole(Role.RECEIVER)
+                                                      .attachSource(source)
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+            assertThat(responseAttach2.getName(), is(notNullValue()));
+            assertThat(responseAttach2.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach2.getRole(), is(Role.SENDER));
+
+
+            final Detach responseDetach2 = interaction.detachClose(false)
+                                                     .detachHandle(UnsignedInteger.ONE)
+                                                     .detach()
+                                                     .consumeResponse()
+                                                     .getLatestResponse(Detach.class);
+            assertThat(responseDetach2.getError(), is(nullValue()));
+
+
+            final Detach responseDetach1 = interaction.detachClose(true)
+                                                      .detachHandle(UnsignedInteger.ZERO)
+                                                      .detach()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(Detach.class);
+            assertThat(responseDetach1.getClosed(), is(equalTo(Boolean.TRUE)));
+            assertThat(responseDetach1.getError(), is(nullValue()));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    public void sharedGlobalDurableSubscriptionLinkAttachDetach() throws Exception
+    {
+        String subscriptionName = "foo";
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setCapabilities(new Symbol[]{TOPIC, GLOBAL, SHARED});
+            source.setAddress("amq.topic");
+            source.setDurable(TerminusDurability.CONFIGURATION);
+
+            final Interaction interaction = transport.newInteraction();
+            final Attach responseAttach1 = interaction.negotiateProtocol().consumeResponse()
+                                                      .open().consumeResponse(Open.class)
+                                                      .begin().consumeResponse(Begin.class)
+                                                      .attachName(subscriptionName + "|global")
+                                                      .attachHandle(UnsignedInteger.ZERO)
+                                                      .attachRole(Role.RECEIVER)
+                                                      .attachSource(source)
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+            assertThat(responseAttach1.getName(), is(notNullValue()));
+            assertThat(responseAttach1.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach1.getRole(), is(Role.SENDER));
+
+            final Attach responseAttach2 = interaction.attachName(subscriptionName + "|global2")
+                                                      .attachHandle(UnsignedInteger.ONE)
+                                                      .attachRole(Role.RECEIVER)
+                                                      .attachSource(source)
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+            assertThat(responseAttach2.getName(), is(notNullValue()));
+            assertThat(responseAttach2.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach2.getRole(), is(Role.SENDER));
+
+
+            final Detach responseDetach2 = interaction.detachClose(false)
+                                                      .detachHandle(UnsignedInteger.ONE)
+                                                      .detach()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(Detach.class);
+            assertThat(responseDetach2.getClosed(), is(not(equalTo(Boolean.TRUE))));
+            assertThat(responseDetach2.getError(), is(nullValue()));
+
+
+            final Detach responseDetach1 = interaction.detachClose(false)
+                                                      .detachHandle(UnsignedInteger.ZERO)
+                                                      .detach()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(Detach.class);
+            assertThat(responseDetach1.getClosed(), is(not(equalTo(Boolean.TRUE))));
+            assertThat(responseDetach1.getError(), is(nullValue()));
+
+
+            final Attach responseAttach3 = interaction.attachName(subscriptionName + "|global")
+                                                      .attachHandle(UnsignedInteger.valueOf(2))
+                                                      .attachRole(Role.RECEIVER)
+                                                      .attachSource(null)
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+            assertThat(responseAttach3.getName(), is(notNullValue()));
+            assertThat(responseAttach3.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach3.getRole(), is(Role.SENDER));
+            assertThat(responseAttach3.getSource(), is(notNullValue()));
+            assertThat(responseAttach3.getSource(), is(instanceOf(Source.class)));
+            assertThat(((Source)responseAttach3.getSource()).getAddress(), is(equalTo("amq.topic")));
+
+            final Detach responseDetach3 = interaction.detachClose(true)
+                                                      .detachHandle(UnsignedInteger.valueOf(2))
+                                                      .detach()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(Detach.class);
+            assertThat(responseDetach3.getClosed(), is(equalTo(Boolean.TRUE)));
+            assertThat(responseDetach3.getError(), is(nullValue()));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    public void sharedGlobalDurableSubscriptionCloseWithActiveLink() throws Exception
+    {
+        String subscriptionName = "foo";
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+            source.setCapabilities(new Symbol[]{TOPIC, GLOBAL, SHARED});
+            source.setAddress("amq.topic");
+            source.setDurable(TerminusDurability.CONFIGURATION);
+
+            final Interaction interaction = transport.newInteraction();
+            final Attach responseAttach1 = interaction.negotiateProtocol().consumeResponse()
+                                                      .open().consumeResponse(Open.class)
+                                                      .begin().consumeResponse(Begin.class)
+                                                      .attachName(subscriptionName + "|global")
+                                                      .attachHandle(UnsignedInteger.ZERO)
+                                                      .attachRole(Role.RECEIVER)
+                                                      .attachSource(source)
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+            assertThat(responseAttach1.getName(), is(notNullValue()));
+            assertThat(responseAttach1.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach1.getRole(), is(Role.SENDER));
+
+            final Attach responseAttach2 = interaction.attachName(subscriptionName + "|global2")
+                                                      .attachHandle(UnsignedInteger.ONE)
+                                                      .attachRole(Role.RECEIVER)
+                                                      .attachSource(source)
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+            assertThat(responseAttach2.getName(), is(notNullValue()));
+            assertThat(responseAttach2.getHandle().longValue(),
+                       is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach2.getRole(), is(Role.SENDER));
+
+
+            final Detach responseDetach1 = interaction.detachClose(true)
+                                                      .detachHandle(UnsignedInteger.ZERO)
+                                                      .detach()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(Detach.class);
+            assertThat(responseDetach1.getClosed(), is(equalTo(Boolean.TRUE)));
+            assertThat(responseDetach1.getError(), is(notNullValue()));
+            assertThat(responseDetach1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
+
+            interaction.doCloseConnection();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org