You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/17 20:57:42 UTC

[activemq-artemis] 02/02: ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit c1fd16d66a506d7878f0da6b71dfeaa096221d8c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jun 17 15:33:14 2022 -0400

    ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()
    
    After a TX in stomp is committed, a session will never be cleared from ActiveMQServer
---
 .../core/protocol/stomp/StompProtocolManager.java  | 67 ++++++++++++++--------
 .../artemis/core/protocol/stomp/StompSession.java  | 14 +++++
 .../artemis/tests/integration/stomp/StompTest.java | 33 +++++++++++
 3 files changed, 89 insertions(+), 25 deletions(-)

diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 87d5c28ed7..1f9005a1c4 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -19,10 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import io.netty.channel.ChannelPipeline;
@@ -63,7 +62,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
 
    private final Executor executor;
 
-   private final Map<Object, StompSession> transactedSessions = new HashMap<>();
+   // connectionID / Map<SessionId, StompSession>
+   private final Map<Object, Map<Object, StompSession>> transactedSessions = new ConcurrentHashMap<>();
 
    // key => connection ID, value => Stomp session
    private final Map<Object, StompSession> sessions = new HashMap<>();
@@ -218,10 +218,29 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
    }
 
    public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception {
-      return internalGetSession(connection, transactedSessions, txID, true);
+      return internalGetSession(connection, getTXMap(connection.getID()), txID, true);
    }
 
+   public Map<Object, Map<Object, StompSession>> getTransactedSessions() {
+      return transactedSessions;
+   }
+
+   private Map<Object, StompSession> getTXMap(Object objectID) {
+      Map<Object, StompSession> sessions = transactedSessions.get(objectID);
+      if (sessions == null) {
+         sessions = new HashMap<>();
+         Map<Object, StompSession> oldValue = transactedSessions.putIfAbsent(objectID, sessions);
+         if (oldValue != null) {
+            sessions = oldValue;
+         }
+      }
+
+      return sessions;
+   }
+
+
    private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception {
+      System.out.println("Looking for sessionID " + id);
       StompSession stompSession = sessions.get(id);
       if (stompSession == null) {
          stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
@@ -253,21 +272,18 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
                }
             }
 
-            // removed the transacted session belonging to the connection
-            Iterator<Entry<Object, StompSession>> iterator = transactedSessions.entrySet().iterator();
-            while (iterator.hasNext()) {
-               Map.Entry<Object, StompSession> entry = iterator.next();
-               if (entry.getValue().getConnection() == connection) {
-                  ServerSession serverSession = entry.getValue().getCoreSession();
-                  try {
-                     serverSession.rollback(true);
-                     serverSession.close(false);
-                  } catch (Exception e) {
-                     ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
-                  }
-                  iterator.remove();
+            Map<Object, StompSession> sessionMap = getTXMap(connection.getID());
+            sessionMap.values().forEach(ss -> {
+               try {
+                  ss.getCoreSession().rollback(false);
+                  ss.getCoreSession().close(false);
+               } catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
                }
-            }
+            });
+            sessionMap.clear();
+
+            transactedSessions.remove(connection.getID());
          }
       });
    }
@@ -323,20 +339,20 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
 
    public void commitTransaction(StompConnection connection, String txID) throws Exception {
       StompSession session = getTransactedSession(connection, txID);
-      if (session == null) {
+      if (session == null || !session.isTxPending()) {
          throw new ActiveMQStompException(connection, "No transaction started: " + txID);
       }
-      transactedSessions.remove(txID);
       session.getCoreSession().commit();
+      session.end();
    }
 
    public void abortTransaction(StompConnection connection, String txID) throws Exception {
       StompSession session = getTransactedSession(connection, txID);
-      if (session == null) {
+      if (session == null || !session.isTxPending()) {
          throw new ActiveMQStompException(connection, "No transaction started: " + txID);
       }
-      transactedSessions.remove(txID);
       session.getCoreSession().rollback(false);
+      session.end();
    }
 
    public StompPostReceiptFunction subscribe(StompConnection connection,
@@ -374,12 +390,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
 
    public void beginTransaction(StompConnection connection, String txID) throws Exception {
       ActiveMQServerLogger.LOGGER.debugf("-------------------------------Stomp begin tx: %s", txID);
-      if (transactedSessions.containsKey(txID)) {
+      // create the transacted session
+      StompSession session = getTransactedSession(connection, txID);
+      if (session.isTxPending()) {
          ActiveMQServerLogger.LOGGER.stompErrorTXExists(txID);
          throw new ActiveMQStompException(connection, "Transaction already started: " + txID);
       }
-      // create the transacted session
-      getTransactedSession(connection, txID);
+      session.begin();
    }
 
    public boolean destinationExists(String destination) {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 8cdc9de769..10240f2993 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -72,6 +72,20 @@ public class StompSession implements SessionCallback {
 
    private volatile boolean noLocal = false;
 
+   private boolean txPending = false;
+
+   public synchronized void begin() {
+      txPending = true;
+   }
+
+   public synchronized boolean isTxPending() {
+      return txPending;
+   }
+
+   public synchronized void end() {
+      txPending = false;
+   }
+
    StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) {
       this.connection = connection;
       this.manager = manager;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 30c37b090d..0075990605 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
 import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -60,6 +61,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -662,6 +664,37 @@ public class StompTest extends StompTestBase {
 
    }
 
+   @Test
+   public void testTransactedSessionLeak() throws Exception {
+      for (int i = 0; i < 10; i++) {
+         conn = StompClientConnectionFactory.createClientConnection(uri);
+         conn.connect(defUser, defPass);
+
+
+         for (int s = 0; s < 10; s++) {
+            String txId = "tx" + i + "_" + s;
+            beginTransaction(conn, txId);
+            send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, txId);
+            commitTransaction(conn, txId, true);
+         }
+
+         Wait.assertEquals(13, () -> server.getSessions().size(), 1000, 100);
+         conn.disconnect();
+      }
+
+      if (connection != null) {
+         connection.close();
+      }
+
+      Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100);
+
+      Acceptor stompAcceptor = server.getRemotingService().getAcceptors().get("stomp");
+      StompProtocolManager stompProtocolManager = (StompProtocolManager) stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
+      Assert.assertNotNull(stompProtocolManager);
+
+      Assert.assertEquals(0, stompProtocolManager.getTransactedSessions().size());
+   }
+
    @Test
    public void testIngressTimestamp() throws Exception {
       server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));