You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/11/15 20:02:42 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1929 race in STOMP
w/identical durable subs
ARTEMIS-1929 race in STOMP w/identical durable subs
(cherry picked from commit 2e53d8f5fbf60da0aedc53e9da77f7579f301cad)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b3d700a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b3d700a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b3d700a6
Branch: refs/heads/2.6.x
Commit: b3d700a66e7de13af786c1e9ed7a8ccb24e4e53a
Parents: e334da3
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Sep 25 16:41:27 2018 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Nov 15 14:02:04 2018 -0600
----------------------------------------------------------------------
.../core/protocol/stomp/StompSession.java | 7 +-
.../integration/stomp/v12/StompV12Test.java | 83 ++++++++++++++++++++
2 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b3d700a6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index e370c81..80e1d1c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
@@ -269,7 +270,11 @@ public class StompSession implements SessionCallback {
}
queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
if (manager.getServer().locateQueue(queueName) == null) {
- session.createQueue(address, queueName, selectorSimple, false, true);
+ try {
+ session.createQueue(address, queueName, selectorSimple, false, true);
+ } catch (ActiveMQQueueExistsException e) {
+ // ignore; can be caused by concurrent durable subscribers
+ }
}
} else {
queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b3d700a6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index d94d7c1..5e18308 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -32,10 +32,12 @@ import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
@@ -1452,6 +1454,87 @@ public class StompV12Test extends StompTestBase {
}
@Test
+ public void testMultipleDurableSubscribers() throws Exception {
+ org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
+ conn.connect(defUser, defPass, "myClientID");
+ StompClientConnectionV12 conn2 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
+ conn2.connect(defUser, defPass, "myClientID");
+
+ subscribe(conn, UUID.randomUUID().toString(), "client-individual", getName());
+ subscribe(conn2, UUID.randomUUID().toString(), "clientindividual", getName());
+
+ conn.closeTransport();
+ waitDisconnect(conn);
+ conn2.closeTransport();
+ waitDisconnect(conn2);
+ }
+
+ @Test
+ public void testMultipleConcurrentDurableSubscribers() throws Exception {
+ org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
+
+ int NUMBER_OF_THREADS = 25;
+ SubscriberThread[] threads = new SubscriberThread[NUMBER_OF_THREADS];
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new SubscriberThread("subscriber::" + i, StompClientConnectionFactory.createClientConnection(uri), startFlag, alignFlag);
+ }
+
+ for (SubscriberThread t : threads) {
+ t.start();
+ }
+
+ alignFlag.await();
+
+ startFlag.countDown();
+
+ for (SubscriberThread t : threads) {
+ t.join();
+ Assert.assertEquals(0, t.errors.get());
+ }
+ }
+
+ class SubscriberThread extends Thread {
+ final StompClientConnection connection;
+ final CountDownLatch startFlag;
+ final CountDownLatch alignFlag;
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ SubscriberThread(String name, StompClientConnection connection, CountDownLatch startFlag, CountDownLatch alignFlag) {
+ super(name);
+ this.connection = connection;
+ this.startFlag = startFlag;
+ this.alignFlag = alignFlag;
+ }
+
+ @Override
+ public void run() {
+ try {
+ alignFlag.countDown();
+ startFlag.await();
+ connection.connect(defUser, defPass, "myClientID");
+ ClientStompFrame frame = subscribeTopic(connection, UUID.randomUUID().toString(), "client-individual", "123");
+ if (frame.getCommand().equals(Stomp.Responses.ERROR)) {
+
+ errors.incrementAndGet();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ } finally {
+ try {
+ connection.disconnect();
+ waitDisconnect((StompClientConnectionV12) connection);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Test
public void testDurableSubscriberWithReconnection() throws Exception {
conn.connect(defUser, defPass, CLIENT_ID);