You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/17 18:04:45 UTC
[activemq-artemis] branch master updated: ARTEMIS-2493 OpenWire
session close doesn't cleanup consumer refs
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 05a9331 ARTEMIS-2493 OpenWire session close doesn't cleanup consumer refs
new 3cbd5a3 This closes #2840
05a9331 is described below
commit 05a93314cd038e0d929109e3adaf044f514fdc5f
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Sep 17 09:44:25 2019 +0800
ARTEMIS-2493 OpenWire session close doesn't cleanup consumer refs
When an openwire client closes the session, the broker doesn't
clean up its server consumer references even though the core
consumers are closed. This results a leak when sessions within
a connection are created and closed when the connection keeps open.
---
.../core/protocol/openwire/OpenWireConnection.java | 15 ++++----
.../protocol/openwire/OpenWireProtocolManager.java | 4 +++
.../integration/openwire/SimpleOpenWireTest.java | 42 ++++++++++++++++++++--
3 files changed, 51 insertions(+), 10 deletions(-)
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6c846ba..9654a3e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1178,16 +1178,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
// Don't let new consumers or producers get added while we are closing
// this down.
session.shutdown();
- // Cascade the connection stop producers.
- // we don't stop consumer because in core
- // closing the session will do the job
+
for (ProducerId producerId : session.getProducerIds()) {
- try {
- processRemoveProducer(producerId);
- } catch (Throwable e) {
- // LOG.warn("Failed to remove producer: {}", producerId, e);
- }
+ processRemoveProducer(producerId);
}
+
+ for (ConsumerId consumerId : session.getConsumerIds()) {
+ processRemoveConsumer(consumerId, lastDeliveredSequenceId);
+ }
+
state.removeSession(id);
propagateLastSequenceId(session, lastDeliveredSequenceId);
removeSession(context, session.getInfo());
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 44cc8da..efe7801 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -682,4 +682,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
return mappedDestination;
}
+
+ public List<OpenWireConnection> getConnections() {
+ return connections;
+ }
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 77efd28..d2d1d0c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -47,6 +47,7 @@ import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
@@ -64,12 +65,17 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.state.SessionState;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -244,8 +250,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
Connection connection = factory.createConnection();
- Collection<Session> sessions = new LinkedList<>();
-
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
System.out.println("Queue:" + queue);
@@ -271,6 +275,40 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
@Test
+ public void testSessionCloseWithOpenConnection() throws Exception {
+ try (Connection connection = factory.createConnection()) {
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+
+ session.createConsumer(queue);
+ session.createConsumer(queue);
+
+ connection.start();
+
+ Field infoField = ActiveMQSession.class.getDeclaredField("info");
+ infoField.setAccessible(true);
+ SessionInfo info = (SessionInfo) infoField.get(session);
+
+ NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("netty");
+ OpenWireProtocolManager protocolManager = (OpenWireProtocolManager) acceptor.getProtocolMap().get("OPENWIRE");
+
+ List<OpenWireConnection> connections = protocolManager.getConnections();
+ assertEquals(1, connections.size());
+
+ OpenWireConnection conn = connections.get(0);
+
+ SessionState sessionState = conn.getState().getSessionState(info.getSessionId());
+
+ Wait.assertEquals(2, sessionState.getConsumerIds()::size, 5000);
+
+ session.close();
+
+ Wait.assertEquals(0, sessionState.getConsumerIds()::size, 5000);
+ }
+ }
+
+ @Test
public void testRollback() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);