You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/17 20:57:40 UTC

[activemq-artemis] branch main updated (196e604778 -> c1fd16d66a)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


    from 196e604778 NO-JIRA: suppress significant teardown stacktraces after compatibility test that is a no-op...make test itself skip
     new 9dd026118e ARTEMIS-3862 Short lived subscriptiong makes address size inconsistent
     new c1fd16d66a ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/protocol/stomp/StompProtocolManager.java  |  67 +++++---
 .../artemis/core/protocol/stomp/StompSession.java  |  14 ++
 .../artemis/core/server/impl/QueueImpl.java        |  46 ++++++
 .../client/RemoveSubscriptionRaceTest.java         | 172 +++++++++++++++++++++
 .../tests/integration/paging/PagingTest.java       |   2 +-
 .../artemis/tests/integration/stomp/StompTest.java |  33 ++++
 6 files changed, 308 insertions(+), 26 deletions(-)
 create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java


[activemq-artemis] 01/02: ARTEMIS-3862 Short lived subscriptiong makes address size inconsistent

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 9dd026118e19e9a07b89661412c4ab48a08f73ad
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Jun 16 22:53:07 2022 -0400

    ARTEMIS-3862 Short lived subscriptiong makes address size inconsistent
---
 .../artemis/core/server/impl/QueueImpl.java        |  46 ++++++
 .../client/RemoveSubscriptionRaceTest.java         | 172 +++++++++++++++++++++
 .../tests/integration/paging/PagingTest.java       |   2 +-
 3 files changed, 219 insertions(+), 1 deletion(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e17fd6cb92..4068eafbcc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2910,6 +2910,37 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   /** This method is to only be used during deliveryAsync when the queue was destroyed
+    and the async process left more messages to be delivered
+    This is a race between destroying the queue and async sends that came after
+    the deleteQueue already happened. */
+   private void removeMessagesWhileDelivering() throws Exception {
+      assert queueDestroyed : "Method to be used only when the queue was destroyed";
+      Transaction tx = new TransactionImpl(storageManager);
+      int txCount = 0;
+
+      try (LinkedListIterator<MessageReference> iter = iterator()) {
+         while (iter.hasNext()) {
+            MessageReference ref = iter.next();
+
+            if (ref.isPaged()) {
+               // this means the queue is being removed
+               // hence paged references are just going away through
+               // page cleanup
+               continue;
+            }
+            acknowledge(tx, ref, AckReason.KILLED, null);
+            iter.remove();
+            refRemoved(ref);
+            txCount++;
+         }
+
+         if (txCount > 0) {
+            tx.commit();
+         }
+      }
+   }
+
    /**
     * This method will deliver as many messages as possible until all consumers are busy or there
     * are no more matching or available messages.
@@ -2960,6 +2991,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
          synchronized (this) {
 
+            if (queueDestroyed) {
+               if (messageReferences.size() == 0) {
+                  return false;
+               }
+               try {
+                  removeMessagesWhileDelivering();
+               } catch (Exception e) {
+                  logger.warn(e.getMessage(), e);
+               }
+               return false;
+            }
+
             // Need to do these checks inside the synchronized
             if (isPaused() || !canDispatch()) {
                return false;
@@ -3109,6 +3152,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    private void checkDepage() {
+      if (queueDestroyed) {
+         return;
+      }
       if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.tryNext() != PageIterator.NextResult.noElements) {
          scheduleDepage(false);
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
new file mode 100644
index 0000000000..0911d69e5b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/RemoveSubscriptionRaceTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoveSubscriptionRaceTest extends ActiveMQTestBase {
+
+
+   private static final String SUB_NAME = "SubscriptionStressTest";
+
+   ActiveMQServer server;
+
+   @Before
+   public void setServer() throws Exception {
+   }
+
+   @Test
+   public void testCreateSubscriptionCoreNoFiles() throws Exception {
+      internalTest("core", false, 5, 1000);
+   }
+
+   @Test
+   public void testCreateSubscriptionAMQPNoFiles() throws Exception {
+      internalTest("amqp", false, 5, 1000);
+   }
+
+   @Test
+   public void testCreateSubscriptionCoreRealFiles() throws Exception {
+      internalTest("core", true, 2, 200);
+   }
+
+   @Test
+   public void testCreateSubscriptionAMQPRealFiles() throws Exception {
+      internalTest("amqp", true, 2, 200);
+   }
+
+   public void internalTest(String protocol, boolean realFiles, int threads, int numberOfMessages) throws Exception {
+      server = createServer(realFiles, true);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(SUB_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().addQueueConfiguration(new QueueConfiguration().setName("Sub_1").setAddress(SUB_NAME).setRoutingType(RoutingType.MULTICAST));
+      server.start();
+
+      CountDownLatch runningLatch = new CountDownLatch(threads);
+      AtomicBoolean running = new AtomicBoolean(true);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1, threads)); // I'm using the max here, because I may set threads=0 while hacking the test
+
+      runAfter(() -> executorService.shutdownNow());
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+      CyclicBarrier flagStart = new CyclicBarrier(threads + 1);
+
+      for (int i = 0; i < threads; i++) {
+         executorService.execute(() -> {
+            try {
+               flagStart.await(10, TimeUnit.SECONDS);
+               for (int n = 0; n < numberOfMessages && running.get(); n++) {
+                  Connection connection = factory.createConnection();
+                  connection.start();
+                  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                  Topic topic = session.createTopic(SUB_NAME);
+                  MessageConsumer consumer = session.createConsumer(topic);
+                  Message message = consumer.receiveNoWait();
+                  if (message != null) {
+                     message.acknowledge();
+                  }
+                  connection.close();
+               }
+            } catch (Throwable e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+
+            } finally {
+               runningLatch.countDown();
+            }
+         });
+      }
+
+      Connection connection = factory.createConnection();
+      connection.start();
+
+      Queue queue = server.locateQueue("Sub_1");
+      Assert.assertNotNull(queue);
+
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(SUB_NAME);
+      MessageProducer producer = session.createProducer(topic);
+      MessageConsumer consumer = session.createConsumer(session.createQueue(SUB_NAME + "::" + "Sub_1"));
+
+      flagStart.await(10, TimeUnit.SECONDS);
+      try {
+         for (int i = 0; i < numberOfMessages; i++) {
+            producer.send(session.createTextMessage("a"));
+            Assert.assertNotNull(consumer.receive(5000));
+         }
+         connection.close();
+      } finally {
+         running.set(false);
+         Assert.assertTrue(runningLatch.await(10, TimeUnit.SECONDS));
+      }
+
+      Wait.assertEquals(0, this::countAddMessage, 5000, 100);
+
+      Wait.assertEquals(0L, queue.getPagingStore()::getAddressSize, 2000, 100);
+   }
+
+   int countAddMessage() throws Exception {
+      StorageManager manager = server.getStorageManager();
+
+      if (manager instanceof JournalStorageManager) {
+         JournalStorageManager journalStorageManager = (JournalStorageManager) manager;
+         journalStorageManager.getMessageJournal().scheduleCompactAndBlock(5_000);
+      } else {
+         return 0;
+      }
+
+      HashMap<Integer, AtomicInteger> journalCounts = countJournal(server.getConfiguration());
+      AtomicInteger value = journalCounts.get((int) JournalRecordIds.ADD_MESSAGE_PROTOCOL);
+      if (value == null) {
+         return 0;
+      }
+      return value.get();
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 6e1549294f..1214df64f1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -2738,7 +2738,7 @@ public class PagingTest extends ActiveMQTestBase {
       for (int msgCount = 0; msgCount < numberOfMessages; msgCount++) {
          log.debug("Received " + msgCount);
          msgReceived++;
-         ClientMessage msg = consumer.receiveImmediate();
+         ClientMessage msg = consumer.receive(5000);
          if (msg == null) {
             log.debug("It's null. leaving now");
             sessionConsumer.commit();


[activemq-artemis] 02/02: ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit c1fd16d66a506d7878f0da6b71dfeaa096221d8c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jun 17 15:33:14 2022 -0400

    ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()
    
    After a TX in stomp is committed, a session will never be cleared from ActiveMQServer
---
 .../core/protocol/stomp/StompProtocolManager.java  | 67 ++++++++++++++--------
 .../artemis/core/protocol/stomp/StompSession.java  | 14 +++++
 .../artemis/tests/integration/stomp/StompTest.java | 33 +++++++++++
 3 files changed, 89 insertions(+), 25 deletions(-)

diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 87d5c28ed7..1f9005a1c4 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -19,10 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import io.netty.channel.ChannelPipeline;
@@ -63,7 +62,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
 
    private final Executor executor;
 
-   private final Map<Object, StompSession> transactedSessions = new HashMap<>();
+   // connectionID / Map<SessionId, StompSession>
+   private final Map<Object, Map<Object, StompSession>> transactedSessions = new ConcurrentHashMap<>();
 
    // key => connection ID, value => Stomp session
    private final Map<Object, StompSession> sessions = new HashMap<>();
@@ -218,10 +218,29 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
    }
 
    public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception {
-      return internalGetSession(connection, transactedSessions, txID, true);
+      return internalGetSession(connection, getTXMap(connection.getID()), txID, true);
    }
 
+   public Map<Object, Map<Object, StompSession>> getTransactedSessions() {
+      return transactedSessions;
+   }
+
+   private Map<Object, StompSession> getTXMap(Object objectID) {
+      Map<Object, StompSession> sessions = transactedSessions.get(objectID);
+      if (sessions == null) {
+         sessions = new HashMap<>();
+         Map<Object, StompSession> oldValue = transactedSessions.putIfAbsent(objectID, sessions);
+         if (oldValue != null) {
+            sessions = oldValue;
+         }
+      }
+
+      return sessions;
+   }
+
+
    private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception {
+      System.out.println("Looking for sessionID " + id);
       StompSession stompSession = sessions.get(id);
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
@@ -253,21 +272,18 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
                }
             }
 
-            // removed the transacted session belonging to the connection
-            Iterator<Entry<Object, StompSession>> iterator = transactedSessions.entrySet().iterator();
-            while (iterator.hasNext()) {
-               Map.Entry<Object, StompSession> entry = iterator.next();
-               if (entry.getValue().getConnection() == connection) {
-                  ServerSession serverSession = entry.getValue().getCoreSession();
-                  try {
-                     serverSession.rollback(true);
-                     serverSession.close(false);
-                  } catch (Exception e) {
-                     ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
-                  }
-                  iterator.remove();
+            Map<Object, StompSession> sessionMap = getTXMap(connection.getID());
+            sessionMap.values().forEach(ss -> {
+               try {
+                  ss.getCoreSession().rollback(false);
+                  ss.getCoreSession().close(false);
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
                }
-            }
+            });
+            sessionMap.clear();
+
+            transactedSessions.remove(connection.getID());
          }
       });
    }
@@ -323,20 +339,20 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
 
    public void commitTransaction(StompConnection connection, String txID) throws Exception {
       StompSession session = getTransactedSession(connection, txID);
-      if (session == null) {
+      if (session == null || !session.isTxPending()) {
          throw new ActiveMQStompException(connection, "No transaction started: " + txID);
       }
-      transactedSessions.remove(txID);
       session.getCoreSession().commit();
+      session.end();
    }
 
    public void abortTransaction(StompConnection connection, String txID) throws Exception {
       StompSession session = getTransactedSession(connection, txID);
-      if (session == null) {
+      if (session == null || !session.isTxPending()) {
          throw new ActiveMQStompException(connection, "No transaction started: " + txID);
       }
-      transactedSessions.remove(txID);
       session.getCoreSession().rollback(false);
+      session.end();
    }
 
    public StompPostReceiptFunction subscribe(StompConnection connection,
@@ -374,12 +390,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
 
    public void beginTransaction(StompConnection connection, String txID) throws Exception {
       ActiveMQServerLogger.LOGGER.debugf("-------------------------------Stomp begin tx: %s", txID);
-      if (transactedSessions.containsKey(txID)) {
+      // create the transacted session
+      StompSession session = getTransactedSession(connection, txID);
+      if (session.isTxPending()) {
          ActiveMQServerLogger.LOGGER.stompErrorTXExists(txID);
          throw new ActiveMQStompException(connection, "Transaction already started: " + txID);
       }
-      // create the transacted session
-      getTransactedSession(connection, txID);
+      session.begin();
    }
 
    public boolean destinationExists(String destination) {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 8cdc9de769..10240f2993 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -72,6 +72,20 @@ public class StompSession implements SessionCallback {
 
    private volatile boolean noLocal = false;
 
+   private boolean txPending = false;
+
+   public synchronized void begin() {
+      txPending = true;
+   }
+
+   public synchronized boolean isTxPending() {
+      return txPending;
+   }
+
+   public synchronized void end() {
+      txPending = false;
+   }
+
    StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) {
       this.connection = connection;
       this.manager = manager;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 30c37b090d..0075990605 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
 import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -60,6 +61,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -662,6 +664,37 @@ public class StompTest extends StompTestBase {
 
    }
 
+   @Test
+   public void testTransactedSessionLeak() throws Exception {
+      for (int i = 0; i < 10; i++) {
+         conn = StompClientConnectionFactory.createClientConnection(uri);
+         conn.connect(defUser, defPass);
+
+
+         for (int s = 0; s < 10; s++) {
+            String txId = "tx" + i + "_" + s;
+            beginTransaction(conn, txId);
+            send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, txId);
+            commitTransaction(conn, txId, true);
+         }
+
+         Wait.assertEquals(13, () -> server.getSessions().size(), 1000, 100);
+         conn.disconnect();
+      }
+
+      if (connection != null) {
+         connection.close();
+      }
+
+      Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100);
+
+      Acceptor stompAcceptor = server.getRemotingService().getAcceptors().get("stomp");
+      StompProtocolManager stompProtocolManager = (StompProtocolManager) stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
+      Assert.assertNotNull(stompProtocolManager);
+
+      Assert.assertEquals(0, stompProtocolManager.getTransactedSessions().size());
+   }
+
    @Test
    public void testIngressTimestamp() throws Exception {
       server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));