You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/13 15:33:50 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1034 - non-durable subscription queue not ended on link close

ARTEMIS-1034 - non-durable subscription queue not ended on link close

https://issues.apache.org/jira/browse/ARTEMIS-1034


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b5b6e4be
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b5b6e4be
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b5b6e4be

Branch: refs/heads/master
Commit: b5b6e4bea662a3eb7ab28f9b9ebb4cdb04801805
Parents: e13e014
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Mar 13 13:52:28 2017 +0000
Committer: Justin Bertram <jb...@apache.org>
Committed: Mon Mar 13 10:33:13 2017 -0500

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  | 27 ++++++++++++--------
 .../integration/amqp/ProtonPubSubTest.java      | 14 ++++++++++
 2 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5b6e4be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 55ad550..962110e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -94,6 +94,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private boolean shared = false;
    private boolean global = false;
    private boolean isVolatile = false;
+   private String tempQueueName;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
       super();
@@ -223,6 +224,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          // if dynamic we have to create the node (queue) and set the address on the target, the
          // node is temporary and  will be deleted on closing of the session
          queue = java.util.UUID.randomUUID().toString();
+         tempQueueName = queue;
          try {
             sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
             // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
@@ -342,6 +344,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                   }
                } else {
                   queue = java.util.UUID.randomUUID().toString();
+                  tempQueueName = queue;
                   try {
                      sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
                   } catch (Exception e) {
@@ -445,16 +448,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                if (result.isExists() && source.getDynamic()) {
                   sessionSPI.deleteQueue(queueName);
                } else {
-                  String clientId = getClientId();
-                  String pubId = sender.getName();
-                  if (pubId.contains("|")) {
-                     pubId = pubId.split("\\|")[0];
-                  }
-                  String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
-                  result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
-                  //only delete if it isn't volatile and has no consumers
-                  if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
-                     sessionSPI.deleteQueue(queue);
+                  if (source.getDurable() == TerminusDurability.NONE && tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+                     sessionSPI.removeTemporaryQueue(tempQueueName);
+                  } else {
+                     String clientId = getClientId();
+                     String pubId = sender.getName();
+                     if (pubId.contains("|")) {
+                        pubId = pubId.split("\\|")[0];
+                     }
+                     String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
+                     result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
+                     //only delete if it isn't volatile and has no consumers
+                     if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
+                        sessionSPI.deleteQueue(queue);
+                     }
                   }
                }
             } else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5b6e4be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
index 2ae9b8d..42f30ac 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.After;
@@ -107,6 +108,19 @@ public class ProtonPubSubTest extends ProtonTestBase {
    }
 
    @Test
+   public void testNonDurablePubSubQueueDeleted() throws Exception {
+      int numMessages = 100;
+      Topic topic = createTopic(pubAddress);
+      TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer sub = session.createSubscriber(topic);
+      Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(pubAddress));
+      assertEquals(2, bindingsForAddress.getBindings().size());
+      sub.close();
+      Thread.sleep(1000);
+      assertEquals(1, bindingsForAddress.getBindings().size());
+   }
+
+   @Test
    public void testNonDurableMultiplePubSub() throws Exception {
       int numMessages = 100;
       Topic topic = createTopic(pubAddress);