You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/11/15 20:02:42 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1929 race in STOMP w/identical durable subs

ARTEMIS-1929 race in STOMP w/identical durable subs

(cherry picked from commit 2e53d8f5fbf60da0aedc53e9da77f7579f301cad)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b3d700a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b3d700a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b3d700a6

Branch: refs/heads/2.6.x
Commit: b3d700a66e7de13af786c1e9ed7a8ccb24e4e53a
Parents: e334da3
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Sep 25 16:41:27 2018 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Nov 15 14:02:04 2018 -0600

----------------------------------------------------------------------
 .../core/protocol/stomp/StompSession.java       |  7 +-
 .../integration/stomp/v12/StompV12Test.java     | 83 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b3d700a6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index e370c81..80e1d1c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
@@ -269,7 +270,11 @@ public class StompSession implements SessionCallback {
             }
             queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
             if (manager.getServer().locateQueue(queueName) == null) {
-               session.createQueue(address, queueName, selectorSimple, false, true);
+               try {
+                  session.createQueue(address, queueName, selectorSimple, false, true);
+               } catch (ActiveMQQueueExistsException e) {
+                  // ignore; can be caused by concurrent durable subscribers
+               }
             }
          } else {
             queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b3d700a6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index d94d7c1..5e18308 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -32,10 +32,12 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
@@ -1452,6 +1454,87 @@ public class StompV12Test extends StompTestBase {
    }
 
    @Test
+   public void testMultipleDurableSubscribers() throws Exception {
+      org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
+      conn.connect(defUser, defPass, "myClientID");
+      StompClientConnectionV12 conn2 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection(uri);
+      conn2.connect(defUser, defPass, "myClientID");
+
+      subscribe(conn, UUID.randomUUID().toString(), "client-individual", getName());
+      subscribe(conn2, UUID.randomUUID().toString(), "clientindividual", getName());
+
+      conn.closeTransport();
+      waitDisconnect(conn);
+      conn2.closeTransport();
+      waitDisconnect(conn2);
+   }
+
+   @Test
+   public void testMultipleConcurrentDurableSubscribers() throws Exception {
+      org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.TRACE);
+
+      int NUMBER_OF_THREADS = 25;
+      SubscriberThread[] threads = new SubscriberThread[NUMBER_OF_THREADS];
+      final CountDownLatch startFlag = new CountDownLatch(1);
+      final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
+
+      for (int i = 0; i < threads.length; i++) {
+         threads[i] = new SubscriberThread("subscriber::" + i, StompClientConnectionFactory.createClientConnection(uri), startFlag, alignFlag);
+      }
+
+      for (SubscriberThread t : threads) {
+         t.start();
+      }
+
+      alignFlag.await();
+
+      startFlag.countDown();
+
+      for (SubscriberThread t : threads) {
+         t.join();
+         Assert.assertEquals(0, t.errors.get());
+      }
+   }
+
+   class SubscriberThread extends Thread {
+      final StompClientConnection connection;
+      final CountDownLatch startFlag;
+      final CountDownLatch alignFlag;
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      SubscriberThread(String name, StompClientConnection connection, CountDownLatch startFlag, CountDownLatch alignFlag) {
+         super(name);
+         this.connection = connection;
+         this.startFlag = startFlag;
+         this.alignFlag = alignFlag;
+      }
+
+      @Override
+      public void run() {
+         try {
+            alignFlag.countDown();
+            startFlag.await();
+            connection.connect(defUser, defPass, "myClientID");
+            ClientStompFrame frame = subscribeTopic(connection, UUID.randomUUID().toString(), "client-individual", "123");
+            if (frame.getCommand().equals(Stomp.Responses.ERROR)) {
+
+               errors.incrementAndGet();
+            }
+         } catch (Exception e) {
+            e.printStackTrace();
+            errors.incrementAndGet();
+         } finally {
+            try {
+               connection.disconnect();
+               waitDisconnect((StompClientConnectionV12) connection);
+            } catch (Exception e) {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   @Test
    public void testDurableSubscriberWithReconnection() throws Exception {
       conn.connect(defUser, defPass, CLIENT_ID);