You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/17 20:57:42 UTC
[activemq-artemis] 02/02: ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c1fd16d66a506d7878f0da6b71dfeaa096221d8c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Jun 17 15:33:14 2022 -0400
ARTEMIS-3864 StompTransactions leaking on ActiveMQServer.getSessions()
After a TX in stomp is committed, a session will never be cleared from ActiveMQServer
---
.../core/protocol/stomp/StompProtocolManager.java | 67 ++++++++++++++--------
.../artemis/core/protocol/stomp/StompSession.java | 14 +++++
.../artemis/tests/integration/stomp/StompTest.java | 33 +++++++++++
3 files changed, 89 insertions(+), 25 deletions(-)
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 87d5c28ed7..1f9005a1c4 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -19,10 +19,9 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import io.netty.channel.ChannelPipeline;
@@ -63,7 +62,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
private final Executor executor;
- private final Map<Object, StompSession> transactedSessions = new HashMap<>();
+ // connectionID / Map<SessionId, StompSession>
+ private final Map<Object, Map<Object, StompSession>> transactedSessions = new ConcurrentHashMap<>();
// key => connection ID, value => Stomp session
private final Map<Object, StompSession> sessions = new HashMap<>();
@@ -218,10 +218,29 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception {
- return internalGetSession(connection, transactedSessions, txID, true);
+ return internalGetSession(connection, getTXMap(connection.getID()), txID, true);
}
+ public Map<Object, Map<Object, StompSession>> getTransactedSessions() {
+ return transactedSessions;
+ }
+
+ private Map<Object, StompSession> getTXMap(Object objectID) {
+ Map<Object, StompSession> sessions = transactedSessions.get(objectID);
+ if (sessions == null) {
+ sessions = new HashMap<>();
+ Map<Object, StompSession> oldValue = transactedSessions.putIfAbsent(objectID, sessions);
+ if (oldValue != null) {
+ sessions = oldValue;
+ }
+ }
+
+ return sessions;
+ }
+
+
private StompSession internalGetSession(StompConnection connection, Map<Object, StompSession> sessions, Object id, boolean transacted) throws Exception {
+ System.out.println("Looking for sessionID " + id);
StompSession stompSession = sessions.get(id);
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
@@ -253,21 +272,18 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
}
- // removed the transacted session belonging to the connection
- Iterator<Entry<Object, StompSession>> iterator = transactedSessions.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Object, StompSession> entry = iterator.next();
- if (entry.getValue().getConnection() == connection) {
- ServerSession serverSession = entry.getValue().getCoreSession();
- try {
- serverSession.rollback(true);
- serverSession.close(false);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
- }
- iterator.remove();
+ Map<Object, StompSession> sessionMap = getTXMap(connection.getID());
+ sessionMap.values().forEach(ss -> {
+ try {
+ ss.getCoreSession().rollback(false);
+ ss.getCoreSession().close(false);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
}
- }
+ });
+ sessionMap.clear();
+
+ transactedSessions.remove(connection.getID());
}
});
}
@@ -323,20 +339,20 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
public void commitTransaction(StompConnection connection, String txID) throws Exception {
StompSession session = getTransactedSession(connection, txID);
- if (session == null) {
+ if (session == null || !session.isTxPending()) {
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
- transactedSessions.remove(txID);
session.getCoreSession().commit();
+ session.end();
}
public void abortTransaction(StompConnection connection, String txID) throws Exception {
StompSession session = getTransactedSession(connection, txID);
- if (session == null) {
+ if (session == null || !session.isTxPending()) {
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
- transactedSessions.remove(txID);
session.getCoreSession().rollback(false);
+ session.end();
}
public StompPostReceiptFunction subscribe(StompConnection connection,
@@ -374,12 +390,13 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
public void beginTransaction(StompConnection connection, String txID) throws Exception {
ActiveMQServerLogger.LOGGER.debugf("-------------------------------Stomp begin tx: %s", txID);
- if (transactedSessions.containsKey(txID)) {
+ // create the transacted session
+ StompSession session = getTransactedSession(connection, txID);
+ if (session.isTxPending()) {
ActiveMQServerLogger.LOGGER.stompErrorTXExists(txID);
throw new ActiveMQStompException(connection, "Transaction already started: " + txID);
}
- // create the transacted session
- getTransactedSession(connection, txID);
+ session.begin();
}
public boolean destinationExists(String destination) {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 8cdc9de769..10240f2993 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -72,6 +72,20 @@ public class StompSession implements SessionCallback {
private volatile boolean noLocal = false;
+ private boolean txPending = false;
+
+ public synchronized void begin() {
+ txPending = true;
+ }
+
+ public synchronized boolean isTxPending() {
+ return txPending;
+ }
+
+ public synchronized void end() {
+ txPending = false;
+ }
+
StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) {
this.connection = connection;
this.manager = manager;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 30c37b090d..0075990605 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManager;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
@@ -60,6 +61,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.tests.integration.mqtt.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.MQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -662,6 +664,37 @@ public class StompTest extends StompTestBase {
}
+ @Test
+ public void testTransactedSessionLeak() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ conn = StompClientConnectionFactory.createClientConnection(uri);
+ conn.connect(defUser, defPass);
+
+
+ for (int s = 0; s < 10; s++) {
+ String txId = "tx" + i + "_" + s;
+ beginTransaction(conn, txId);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, txId);
+ commitTransaction(conn, txId, true);
+ }
+
+ Wait.assertEquals(13, () -> server.getSessions().size(), 1000, 100);
+ conn.disconnect();
+ }
+
+ if (connection != null) {
+ connection.close();
+ }
+
+ Wait.assertEquals(0, () -> server.getSessions().size(), 1000, 100);
+
+ Acceptor stompAcceptor = server.getRemotingService().getAcceptors().get("stomp");
+ StompProtocolManager stompProtocolManager = (StompProtocolManager) stompAcceptor.getProtocolHandler().getProtocolMap().get("STOMP");
+ Assert.assertNotNull(stompProtocolManager);
+
+ Assert.assertEquals(0, stompProtocolManager.getTransactedSessions().size());
+ }
+
@Test
public void testIngressTimestamp() throws Exception {
server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));