You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/11/01 16:57:27 UTC
qpid-broker-j git commit: QPID-8201: [Broker-J][AMQP 1.0] Delete
explicitly only subscription queue with source terminus expiry policy 'never'
Repository: qpid-broker-j
Updated Branches:
refs/heads/master b74b157c3 -> ce9cba805
QPID-8201: [Broker-J][AMQP 1.0] Delete explicitly only subscription queue with source terminus expiry policy 'never'
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ce9cba80
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ce9cba80
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ce9cba80
Branch: refs/heads/master
Commit: ce9cba805db6f97fb78682fe59bc2a48923f0d0a
Parents: b74b157
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Nov 1 16:56:53 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Nov 1 16:56:53 2018 +0000
----------------------------------------------------------------------
.../protocol/v1_0/SendingLinkEndpoint.java | 8 +-
.../bindmapjms/TopicDestinationTest.java | 341 +++++++++++++++++++
2 files changed, 347 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ce9cba80/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 17bcfa1..8919413 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -833,11 +833,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
|| ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
|| (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
{
+ cleanUpUnsettledDeliveries();
+ }
+
+ if (close)
+ {
Error closingError = null;
if (getDestination() instanceof ExchangeSendingDestination
- && addressSpace instanceof QueueManagingVirtualHost)
+ && addressSpace instanceof QueueManagingVirtualHost && TerminusExpiryPolicy.NEVER.equals(expiryPolicy))
{
- cleanUpUnsettledDeliveries();
try
{
((QueueManagingVirtualHost) addressSpace).removeSubscriptionQueue(
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ce9cba80/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java
new file mode 100644
index 0000000..dde3e51
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TopicDestinationTest.java
@@ -0,0 +1,341 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class TopicDestinationTest extends BrokerAdminUsingTestBase
+{
+ private static final Symbol TOPIC = Symbol.valueOf("topic");
+ private static final Symbol GLOBAL = Symbol.valueOf("global");
+ private static final Symbol SHARED = Symbol.valueOf("shared");
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "5.2",
+ description = "In order to facilitate these actions for the various Destination types that JMS supports,"
+ + " type information SHOULD be conveyed when creating producer or consumer links"
+ + " for the application by supplying a terminus capability for the particular"
+ + " Destination type to which the client expects to attach."
+ + " Destination Type = Topic, terminus capability (type) = topic")
+ public void nonSharedVolatileSubscriptionLinkAttachDetach() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ source.setCapabilities(new Symbol[]{TOPIC});
+ source.setAddress("amq.topic");
+ source.setDurable(TerminusDurability.NONE);
+
+ final Interaction interaction = transport.newInteraction();
+ final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach.getName(), is(notNullValue()));
+ assertThat(responseAttach.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach.getRole(), is(Role.SENDER));
+
+ final Detach responseDetach = interaction.detachClose(true)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach.getClosed(), is(equalTo(Boolean.TRUE)));
+ assertThat(responseDetach.getError(), is(nullValue()));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ public void nonSharedDurableSubscriptionLinkAttachDetach() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ source.setCapabilities(new Symbol[]{TOPIC});
+ source.setAddress("amq.topic");
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+
+ final Interaction interaction = transport.newInteraction();
+ final Attach responseAttach = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach.getName(), is(notNullValue()));
+ assertThat(responseAttach.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach.getRole(), is(Role.SENDER));
+
+ final Detach responseDetach = interaction.detachClose(true)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach.getClosed(), is(equalTo(Boolean.TRUE)));
+ assertThat(responseDetach.getError(), is(nullValue()));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ public void sharedGlobalVolatileSubscriptionLinkAttachDetach() throws Exception
+ {
+ String subscriptionName = "foo";
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ source.setCapabilities(new Symbol[]{TOPIC, GLOBAL, SHARED});
+ source.setAddress("amq.topic");
+ source.setDurable(TerminusDurability.NONE);
+
+ final Interaction interaction = transport.newInteraction();
+ final Attach responseAttach1 = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachName(subscriptionName + "|global-volatile")
+ .attachHandle(UnsignedInteger.ZERO)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach1.getName(), is(notNullValue()));
+ assertThat(responseAttach1.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach1.getRole(), is(Role.SENDER));
+
+ final Attach responseAttach2 = interaction.attachName(subscriptionName + "|global-volatile2")
+ .attachHandle(UnsignedInteger.ONE)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach2.getName(), is(notNullValue()));
+ assertThat(responseAttach2.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach2.getRole(), is(Role.SENDER));
+
+
+ final Detach responseDetach2 = interaction.detachClose(false)
+ .detachHandle(UnsignedInteger.ONE)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach2.getError(), is(nullValue()));
+
+
+ final Detach responseDetach1 = interaction.detachClose(true)
+ .detachHandle(UnsignedInteger.ZERO)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach1.getClosed(), is(equalTo(Boolean.TRUE)));
+ assertThat(responseDetach1.getError(), is(nullValue()));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ public void sharedGlobalDurableSubscriptionLinkAttachDetach() throws Exception
+ {
+ String subscriptionName = "foo";
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ source.setCapabilities(new Symbol[]{TOPIC, GLOBAL, SHARED});
+ source.setAddress("amq.topic");
+ source.setDurable(TerminusDurability.CONFIGURATION);
+
+ final Interaction interaction = transport.newInteraction();
+ final Attach responseAttach1 = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachName(subscriptionName + "|global")
+ .attachHandle(UnsignedInteger.ZERO)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach1.getName(), is(notNullValue()));
+ assertThat(responseAttach1.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach1.getRole(), is(Role.SENDER));
+
+ final Attach responseAttach2 = interaction.attachName(subscriptionName + "|global2")
+ .attachHandle(UnsignedInteger.ONE)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach2.getName(), is(notNullValue()));
+ assertThat(responseAttach2.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach2.getRole(), is(Role.SENDER));
+
+
+ final Detach responseDetach2 = interaction.detachClose(false)
+ .detachHandle(UnsignedInteger.ONE)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach2.getClosed(), is(not(equalTo(Boolean.TRUE))));
+ assertThat(responseDetach2.getError(), is(nullValue()));
+
+
+ final Detach responseDetach1 = interaction.detachClose(false)
+ .detachHandle(UnsignedInteger.ZERO)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach1.getClosed(), is(not(equalTo(Boolean.TRUE))));
+ assertThat(responseDetach1.getError(), is(nullValue()));
+
+
+ final Attach responseAttach3 = interaction.attachName(subscriptionName + "|global")
+ .attachHandle(UnsignedInteger.valueOf(2))
+ .attachRole(Role.RECEIVER)
+ .attachSource(null)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach3.getName(), is(notNullValue()));
+ assertThat(responseAttach3.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach3.getRole(), is(Role.SENDER));
+ assertThat(responseAttach3.getSource(), is(notNullValue()));
+ assertThat(responseAttach3.getSource(), is(instanceOf(Source.class)));
+ assertThat(((Source)responseAttach3.getSource()).getAddress(), is(equalTo("amq.topic")));
+
+ final Detach responseDetach3 = interaction.detachClose(true)
+ .detachHandle(UnsignedInteger.valueOf(2))
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach3.getClosed(), is(equalTo(Boolean.TRUE)));
+ assertThat(responseDetach3.getError(), is(nullValue()));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ public void sharedGlobalDurableSubscriptionCloseWithActiveLink() throws Exception
+ {
+ String subscriptionName = "foo";
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ source.setCapabilities(new Symbol[]{TOPIC, GLOBAL, SHARED});
+ source.setAddress("amq.topic");
+ source.setDurable(TerminusDurability.CONFIGURATION);
+
+ final Interaction interaction = transport.newInteraction();
+ final Attach responseAttach1 = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachName(subscriptionName + "|global")
+ .attachHandle(UnsignedInteger.ZERO)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach1.getName(), is(notNullValue()));
+ assertThat(responseAttach1.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach1.getRole(), is(Role.SENDER));
+
+ final Attach responseAttach2 = interaction.attachName(subscriptionName + "|global2")
+ .attachHandle(UnsignedInteger.ONE)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+ assertThat(responseAttach2.getName(), is(notNullValue()));
+ assertThat(responseAttach2.getHandle().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseAttach2.getRole(), is(Role.SENDER));
+
+
+ final Detach responseDetach1 = interaction.detachClose(true)
+ .detachHandle(UnsignedInteger.ZERO)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach1.getClosed(), is(equalTo(Boolean.TRUE)));
+ assertThat(responseDetach1.getError(), is(notNullValue()));
+ assertThat(responseDetach1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
+
+ interaction.doCloseConnection();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org