You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/09/26 22:03:22 UTC
[2/2] activemq-artemis git commit: ARTEMIS-741 fix subscription queue
leak on STOMP
ARTEMIS-741 fix subscription queue leak on STOMP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2dcf8de0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2dcf8de0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2dcf8de0
Branch: refs/heads/master
Commit: 2dcf8de0decf7ef10c190825cc382ac0c601cc69
Parents: c86e41d
Author: jbertram <jb...@apache.com>
Authored: Wed Sep 21 19:19:59 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Sep 26 18:01:30 2016 -0400
----------------------------------------------------------------------
.../core/protocol/stomp/StompSession.java | 30 ++++---
.../core/protocol/stomp/StompSubscription.java | 29 ++++---
.../tests/integration/stomp/StompTest.java | 82 ++++++++++++++++++++
3 files changed, 115 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2dcf8de0/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index ba0abbf..2596b15 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
@@ -276,7 +275,8 @@ public class StompSession implements SessionCallback {
String destination,
String selector,
String ack) throws Exception {
- SimpleString queue = SimpleString.toSimpleString(destination);
+ SimpleString queueName = SimpleString.toSimpleString(destination);
+ boolean pubSub = false;
int receiveCredits = consumerCredits;
if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
receiveCredits = -1;
@@ -284,27 +284,27 @@ public class StompSession implements SessionCallback {
if (destination.startsWith("jms.topic")) {
// subscribes to a topic
+ pubSub = true;
if (durableSubscriptionName != null) {
if (clientID == null) {
throw BUNDLE.missingClientID();
}
- queue = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
- QueueQueryResult query = session.executeQueueQuery(queue);
- if (!query.isExists()) {
- session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector), false, true);
+ queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
+ if (manager.getServer().locateQueue(queueName) == null) {
+ session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), false, true);
}
}
else {
- queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
- session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector), true, false);
+ queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
+ session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector), true, false);
}
- ((ServerSessionImpl) session).createConsumer(consumerID, queue, null, false, false, receiveCredits);
+ session.createConsumer(consumerID, queueName, null, false, false, receiveCredits);
}
else {
- ((ServerSessionImpl) session).createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false, false, receiveCredits);
+ session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector), false, false, receiveCredits);
}
- StompSubscription subscription = new StompSubscription(subscriptionID, ack);
+ StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, pubSub);
subscriptions.put(consumerID, subscription);
session.start();
@@ -320,10 +320,9 @@ public class StompSession implements SessionCallback {
StompSubscription sub = entry.getValue();
if (id != null && id.equals(sub.getID())) {
iterator.remove();
+ SimpleString queueName = sub.getQueueName();
session.closeConsumer(consumerID);
- SimpleString queueName = SimpleString.toSimpleString(id);
- QueueQueryResult query = session.executeQueueQuery(queueName);
- if (query.isExists()) {
+ if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);
}
result = true;
@@ -332,8 +331,7 @@ public class StompSession implements SessionCallback {
if (!result && durableSubscriptionName != null && clientID != null) {
SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
- QueueQueryResult query = session.executeQueueQuery(queueName);
- if (query.isExists()) {
+ if (manager.getServer().locateQueue(queueName) != null) {
session.deleteQueue(queueName);
}
result = true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2dcf8de0/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
index 971af27..a1417ad 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;
+import org.apache.activemq.artemis.api.core.SimpleString;
+
public class StompSubscription {
// Constants -----------------------------------------------------
@@ -25,13 +27,20 @@ public class StompSubscription {
private final String ack;
+ private final SimpleString queueName;
+
+ // whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS topic)
+ private final boolean pubSub;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public StompSubscription(String subID, String ack) {
+ public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub) {
this.subID = subID;
this.ack = ack;
+ this.queueName = queueName;
+ this.pubSub = pubSub;
}
// Public --------------------------------------------------------
@@ -44,17 +53,17 @@ public class StompSubscription {
return subID;
}
- @Override
- public String toString() {
- return "StompSubscription[id=" + subID + ", ack=" + ack + "]";
+ public SimpleString getQueueName() {
+ return queueName;
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
+ public boolean isPubSub() {
+ return pubSub;
+ }
- // Inner classes -------------------------------------------------
+ @Override
+ public String toString() {
+ return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", pubSub=" + pubSub + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2dcf8de0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 1c92f42..951aa85 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1282,6 +1282,7 @@ public class StompTest extends StompTestBase {
@Test
public void testSubscribeToTopic() throws Exception {
+ final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
@@ -1301,6 +1302,19 @@ public class StompTest extends StompTestBase {
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("RECEIPT"));
+ assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisfied() throws Exception {
+ if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ }, TimeUnit.SECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
+
sendMessage(getName(), topic);
frame = receiveFrame(10000);
@@ -1326,6 +1340,74 @@ public class StompTest extends StompTestBase {
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
+ assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ @Test
+ public void testSubscribeToQueue() throws Exception {
+ final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt: 12\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisfied() throws Exception {
+ if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+ }, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
+
+ sendMessage(getName(), queue);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame = "UNSUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt: 1234\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for UNSUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendMessage(getName(), queue);
+
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.assertNull("No message should have been received since subscription was removed", frame);
+
+ assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length);
+
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
sendFrame(frame);
}