You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/17 18:04:45 UTC

[activemq-artemis] branch master updated: ARTEMIS-2493 OpenWire session close doesn't cleanup consumer refs

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 05a9331  ARTEMIS-2493 OpenWire session close doesn't cleanup consumer refs
     new 3cbd5a3  This closes #2840
05a9331 is described below

commit 05a93314cd038e0d929109e3adaf044f514fdc5f
Author: Howard Gao <ho...@gmail.com>
AuthorDate: Tue Sep 17 09:44:25 2019 +0800

    ARTEMIS-2493 OpenWire session close doesn't cleanup consumer refs
    
    When an openwire client closes the session, the broker doesn't
    clean up its server consumer references even though the core
    consumers are closed. This results a leak when sessions within
    a connection are created and closed when the connection keeps open.
---
 .../core/protocol/openwire/OpenWireConnection.java | 15 ++++----
 .../protocol/openwire/OpenWireProtocolManager.java |  4 +++
 .../integration/openwire/SimpleOpenWireTest.java   | 42 ++++++++++++++++++++--
 3 files changed, 51 insertions(+), 10 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6c846ba..9654a3e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1178,16 +1178,15 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
          // Don't let new consumers or producers get added while we are closing
          // this down.
          session.shutdown();
-         // Cascade the connection stop producers.
-         // we don't stop consumer because in core
-         // closing the session will do the job
+
          for (ProducerId producerId : session.getProducerIds()) {
-            try {
-               processRemoveProducer(producerId);
-            } catch (Throwable e) {
-               // LOG.warn("Failed to remove producer: {}", producerId, e);
-            }
+            processRemoveProducer(producerId);
          }
+
+         for (ConsumerId consumerId : session.getConsumerIds()) {
+            processRemoveConsumer(consumerId, lastDeliveredSequenceId);
+         }
+
          state.removeSession(id);
          propagateLastSequenceId(session, lastDeliveredSequenceId);
          removeSession(context, session.getInfo());
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 44cc8da..efe7801 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -682,4 +682,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
       }
       return mappedDestination;
    }
+
+   public List<OpenWireConnection> getConnections() {
+      return connections;
+   }
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 77efd28..d2d1d0c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -47,6 +47,7 @@ import javax.jms.XASession;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -64,12 +65,17 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.state.SessionState;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -244,8 +250,6 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
 
          Connection connection = factory.createConnection();
 
-         Collection<Session> sessions = new LinkedList<>();
-
          Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
          Queue queue = session.createQueue(queueName);
          System.out.println("Queue:" + queue);
@@ -271,6 +275,40 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
    }
 
    @Test
+   public void testSessionCloseWithOpenConnection() throws Exception {
+      try (Connection connection = factory.createConnection()) {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+
+         session.createConsumer(queue);
+         session.createConsumer(queue);
+
+         connection.start();
+
+         Field infoField = ActiveMQSession.class.getDeclaredField("info");
+         infoField.setAccessible(true);
+         SessionInfo info = (SessionInfo) infoField.get(session);
+
+         NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("netty");
+         OpenWireProtocolManager protocolManager = (OpenWireProtocolManager) acceptor.getProtocolMap().get("OPENWIRE");
+
+         List<OpenWireConnection> connections = protocolManager.getConnections();
+         assertEquals(1, connections.size());
+
+         OpenWireConnection conn = connections.get(0);
+
+         SessionState sessionState = conn.getState().getSessionState(info.getSessionId());
+
+         Wait.assertEquals(2, sessionState.getConsumerIds()::size, 5000);
+
+         session.close();
+
+         Wait.assertEquals(0, sessionState.getConsumerIds()::size, 5000);
+      }
+   }
+
+   @Test
    public void testRollback() throws Exception {
       try (Connection connection = factory.createConnection()) {
          Session session = connection.createSession(true, Session.SESSION_TRANSACTED);