You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/12/05 15:12:39 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1526 race condition between listConsumers() and closing a Session. When session not found, ignore that consumer and continue.

ARTEMIS-1526 race condition between listConsumers() and closing a Session. When session not found, ignore that consumer and continue.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7d619697
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7d619697
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7d619697

Branch: refs/heads/master
Commit: 7d61969795a9b55126818233903811bf6a1c30de
Parents: 8f9bab6
Author: Pat Fox <pa...@gmail.com>
Authored: Sun Nov 26 15:00:23 2017 +0100
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Dec 5 09:10:40 2017 -0600

----------------------------------------------------------------------
 .../core/management/impl/view/ConsumerView.java | 18 +++-
 .../ActiveMQServerControlMultiThreadTest.java   | 93 +++++++++++++++++++-
 2 files changed, 108 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d619697/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
index a54e40b..386425a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
@@ -45,7 +45,23 @@ public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
    @Override
    public JsonObjectBuilder toJson(ServerConsumer consumer) {
       ServerSession session = server.getSessionByID(consumer.getSessionID());
-      JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID())).add("sessionName", toString(consumer.getSessionName())).add("connectionClientID", toString(consumer.getConnectionClientID())).add("user", toString(session.getUsername())).add("connectionProtocolName", toString(consumer.getConnectionProtocolName())).add("queueName", toString(consumer.getQueueName())).add("queueType", toString(consumer.getQueueType()).toLowerCase()).add("queueAddress", toString(consumer.getQueueAddress().toString())).add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress())).add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress())).add("creationTime", new Date(consumer.getCreationTime()).toString());
+
+      //if session is not available then consumer is not in valid state - ignore
+      if (session == null) {
+         return null;
+      }
+
+      JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID()))
+         .add("sessionName", toString(consumer.getSessionName()))
+         .add("connectionClientID", toString(consumer.getConnectionClientID()))
+         .add("user", toString(session.getUsername()))
+         .add("connectionProtocolName", toString(consumer.getConnectionProtocolName()))
+         .add("queueName", toString(consumer.getQueueName()))
+         .add("queueType", toString(consumer.getQueueType()).toLowerCase())
+         .add("queueAddress", toString(consumer.getQueueAddress().toString()))
+         .add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress()))
+         .add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress()))
+         .add("creationTime", new Date(consumer.getCreationTime()).toString());
       return obj;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d619697/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
index 2735d04..6f4b886 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
@@ -27,9 +27,15 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.JsonUtil;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
 import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase;
 import org.jboss.byteman.contrib.bmunit.BMRule;
@@ -57,8 +63,7 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase {
     */
 
    @Test
-   @BMRules(rules = {@BMRule(
-      name = "Delay listAddress() by 2 secs ",
+   @BMRules(rules = {@BMRule(name = "Delay listAddress() by 2 secs ",
       targetClass = "org.apache.activemq.artemis.core.management.impl.view.AddressView ",
       targetMethod = "<init>(org.apache.activemq.artemis.core.server.ActiveMQServer)",
       targetLocation = "ENTRY",
@@ -109,6 +114,90 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase {
       }
    }
 
+   /**
+    * Aim: verify that no exceptions will occur when a session is closed during listConsumers() operation
+    *
+    * test delays the listConsumer() BEFORE the Session information associated with the consumer is retrieved.
+    * During this delay the client session is closed.
+    *
+    * @throws Exception
+    */
+
+   @Test
+   @BMRules(rules = {@BMRule(name = "Delay listConsumers() by 2 secs ",
+      targetClass = "org.apache.activemq.artemis.core.management.impl.view.ConsumerView",
+      targetMethod = "toJson(org.apache.activemq.artemis.core.server.ServerConsumer)",
+      targetLocation = "ENTRY",
+      action = "org.apache.activemq.artemis.tests.extras.byteman.ActiveMQServerControlMultiThreadTest.delay(2)")})
+
+   public void listConsumersDuringSessionClose() throws Exception {
+
+      ExecutorService executorService = Executors.newFixedThreadPool(1);
+      SimpleString addressName1 = new SimpleString("MyAddress_one");
+      SimpleString queueName1 = new SimpleString("my_queue_one");
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
+      server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, false, false);
+
+      // create a consumer
+      try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator);
+           ClientSession session = csf.createSession()) {
+
+         ClientConsumer consumer1_q1 = session.createConsumer(queueName1);
+
+         // add another consumer (on separate session)
+         ClientSession session_two = csf.createSession();
+         ClientConsumer consumer2_q1 = session_two.createConsumer(queueName1);
+
+         //first(normal) invocation - ensure 2 consumers returned
+         //used to block thread, until the delay() has been called.
+         delayCalled = new CountDownLatch(1);
+
+         String consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);
+
+         JsonObject consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
+         JsonArray consumersArray = (JsonArray) consumersAsJsonObject.get("data");
+
+         Assert.assertEquals("number of  consumers returned from query", 2, consumersArray.size());
+         Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
+         Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));
+
+         //second invocation - close session during listConsumers()
+
+         //used to block thread, until the delay() has been called.
+         delayCalled = new CountDownLatch(1);
+
+         executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+               try {
+                  //wait until the delay occurs and close the session.
+                  delayCalled.await();
+                  session.close();
+               } catch (Exception e) {
+                  e.printStackTrace();
+               }
+
+            }
+         });
+
+         consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);
+
+         consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
+         consumersArray = (JsonArray) consumersAsJsonObject.get("data");
+
+         // session is closed before Json string is created - should only be one consumer returned
+         Assert.assertEquals("number of  consumers returned from query", 1, consumersArray.size());
+         Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
+         Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));
+
+      } finally {
+         executorService.shutdown();
+      }
+   }
+
    //notify delay has been called and wait for X seconds
    public static void delay(int seconds) {
       delayCalled.countDown();