You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/13 15:33:49 UTC
[1/2] activemq-artemis git commit: This closes #1090
Repository: activemq-artemis
Updated Branches:
refs/heads/master e13e014c6 -> c54dfd305
This closes #1090
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c54dfd30
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c54dfd30
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c54dfd30
Branch: refs/heads/master
Commit: c54dfd3055e443ffe0021dcdb9a49aa58bbaffdf
Parents: e13e014 b5b6e4b
Author: Justin Bertram <jb...@apache.org>
Authored: Mon Mar 13 10:33:13 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Mon Mar 13 10:33:13 2017 -0500
----------------------------------------------------------------------
.../amqp/proton/ProtonServerSenderContext.java | 27 ++++++++++++--------
.../integration/amqp/ProtonPubSubTest.java | 14 ++++++++++
2 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1034 - non-durable
subscription queue not ended on link close
Posted by jb...@apache.org.
ARTEMIS-1034 - non-durable subscription queue not ended on link close
https://issues.apache.org/jira/browse/ARTEMIS-1034
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b5b6e4be
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b5b6e4be
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b5b6e4be
Branch: refs/heads/master
Commit: b5b6e4bea662a3eb7ab28f9b9ebb4cdb04801805
Parents: e13e014
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Mar 13 13:52:28 2017 +0000
Committer: Justin Bertram <jb...@apache.org>
Committed: Mon Mar 13 10:33:13 2017 -0500
----------------------------------------------------------------------
.../amqp/proton/ProtonServerSenderContext.java | 27 ++++++++++++--------
.../integration/amqp/ProtonPubSubTest.java | 14 ++++++++++
2 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5b6e4be/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 55ad550..962110e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -94,6 +94,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean shared = false;
private boolean global = false;
private boolean isVolatile = false;
+ private String tempQueueName;
public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, AMQPSessionCallback server) {
super();
@@ -223,6 +224,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// if dynamic we have to create the node (queue) and set the address on the target, the
// node is temporary and will be deleted on closing of the session
queue = java.util.UUID.randomUUID().toString();
+ tempQueueName = queue;
try {
sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
// protonSession.getServerSession().createQueue(queue, queue, null, true, false);
@@ -342,6 +344,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
}
} else {
queue = java.util.UUID.randomUUID().toString();
+ tempQueueName = queue;
try {
sessionSPI.createTemporaryQueue(source.getAddress(), queue, RoutingType.MULTICAST, selector);
} catch (Exception e) {
@@ -445,16 +448,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (result.isExists() && source.getDynamic()) {
sessionSPI.deleteQueue(queueName);
} else {
- String clientId = getClientId();
- String pubId = sender.getName();
- if (pubId.contains("|")) {
- pubId = pubId.split("\\|")[0];
- }
- String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
- result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
- //only delete if it isn't volatile and has no consumers
- if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
- sessionSPI.deleteQueue(queue);
+ if (source.getDurable() == TerminusDurability.NONE && tempQueueName != null && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
+ sessionSPI.removeTemporaryQueue(tempQueueName);
+ } else {
+ String clientId = getClientId();
+ String pubId = sender.getName();
+ if (pubId.contains("|")) {
+ pubId = pubId.split("\\|")[0];
+ }
+ String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
+ result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
+ //only delete if it isn't volatile and has no consumers
+ if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
+ sessionSPI.deleteQueue(queue);
+ }
}
}
} else if (source != null && source.getDynamic() && (source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b5b6e4be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
index 2ae9b8d..42f30ac 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
@@ -107,6 +108,19 @@ public class ProtonPubSubTest extends ProtonTestBase {
}
@Test
+ public void testNonDurablePubSubQueueDeleted() throws Exception {
+ int numMessages = 100;
+ Topic topic = createTopic(pubAddress);
+ TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sub = session.createSubscriber(topic);
+ Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(pubAddress));
+ assertEquals(2, bindingsForAddress.getBindings().size());
+ sub.close();
+ Thread.sleep(1000);
+ assertEquals(1, bindingsForAddress.getBindings().size());
+ }
+
+ @Test
public void testNonDurableMultiplePubSub() throws Exception {
int numMessages = 100;
Topic topic = createTopic(pubAddress);