You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/12/05 15:12:39 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1526 race condition
between listConsumers() and closing a Session. When session not found,
ignore that consumer and continue.
ARTEMIS-1526 race condition between listConsumers() and closing a Session. When session not found, ignore that consumer and continue.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7d619697
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7d619697
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7d619697
Branch: refs/heads/master
Commit: 7d61969795a9b55126818233903811bf6a1c30de
Parents: 8f9bab6
Author: Pat Fox <pa...@gmail.com>
Authored: Sun Nov 26 15:00:23 2017 +0100
Committer: Justin Bertram <jb...@apache.org>
Committed: Tue Dec 5 09:10:40 2017 -0600
----------------------------------------------------------------------
.../core/management/impl/view/ConsumerView.java | 18 +++-
.../ActiveMQServerControlMultiThreadTest.java | 93 +++++++++++++++++++-
2 files changed, 108 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d619697/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
index a54e40b..386425a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/view/ConsumerView.java
@@ -45,7 +45,23 @@ public class ConsumerView extends ActiveMQAbstractView<ServerConsumer> {
@Override
public JsonObjectBuilder toJson(ServerConsumer consumer) {
ServerSession session = server.getSessionByID(consumer.getSessionID());
- JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID())).add("sessionName", toString(consumer.getSessionName())).add("connectionClientID", toString(consumer.getConnectionClientID())).add("user", toString(session.getUsername())).add("connectionProtocolName", toString(consumer.getConnectionProtocolName())).add("queueName", toString(consumer.getQueueName())).add("queueType", toString(consumer.getQueueType()).toLowerCase()).add("queueAddress", toString(consumer.getQueueAddress().toString())).add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress())).add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress())).add("creationTime", new Date(consumer.getCreationTime()).toString());
+
+ //if session is not available then consumer is not in valid state - ignore
+ if (session == null) {
+ return null;
+ }
+
+ JsonObjectBuilder obj = JsonLoader.createObjectBuilder().add("sequentialID", toString(consumer.getSequentialID()))
+ .add("sessionName", toString(consumer.getSessionName()))
+ .add("connectionClientID", toString(consumer.getConnectionClientID()))
+ .add("user", toString(session.getUsername()))
+ .add("connectionProtocolName", toString(consumer.getConnectionProtocolName()))
+ .add("queueName", toString(consumer.getQueueName()))
+ .add("queueType", toString(consumer.getQueueType()).toLowerCase())
+ .add("queueAddress", toString(consumer.getQueueAddress().toString()))
+ .add("connectionLocalAddress", toString(consumer.getConnectionLocalAddress()))
+ .add("connectionRemoteAddress", toString(consumer.getConnectionRemoteAddress()))
+ .add("creationTime", new Date(consumer.getCreationTime()).toString());
return obj;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7d619697/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
index 2735d04..6f4b886 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ActiveMQServerControlMultiThreadTest.java
@@ -27,9 +27,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.integration.management.ManagementTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
@@ -57,8 +63,7 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase {
*/
@Test
- @BMRules(rules = {@BMRule(
- name = "Delay listAddress() by 2 secs ",
+ @BMRules(rules = {@BMRule(name = "Delay listAddress() by 2 secs ",
targetClass = "org.apache.activemq.artemis.core.management.impl.view.AddressView ",
targetMethod = "<init>(org.apache.activemq.artemis.core.server.ActiveMQServer)",
targetLocation = "ENTRY",
@@ -109,6 +114,90 @@ public class ActiveMQServerControlMultiThreadTest extends ManagementTestBase {
}
}
+ /**
+ * Aim: verify that no exceptions will occur when a session is closed during listConsumers() operation
+ *
+ * test delays the listConsumer() BEFORE the Session information associated with the consumer is retrieved.
+ * During this delay the client session is closed.
+ *
+ * @throws Exception
+ */
+
+ @Test
+ @BMRules(rules = {@BMRule(name = "Delay listConsumers() by 2 secs ",
+ targetClass = "org.apache.activemq.artemis.core.management.impl.view.ConsumerView",
+ targetMethod = "toJson(org.apache.activemq.artemis.core.server.ServerConsumer)",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.ActiveMQServerControlMultiThreadTest.delay(2)")})
+
+ public void listConsumersDuringSessionClose() throws Exception {
+
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ SimpleString addressName1 = new SimpleString("MyAddress_one");
+ SimpleString queueName1 = new SimpleString("my_queue_one");
+
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
+ server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, false, false);
+
+ // create a consumer
+ try (ServerLocator locator = createInVMNonHALocator(); ClientSessionFactory csf = createSessionFactory(locator);
+ ClientSession session = csf.createSession()) {
+
+ ClientConsumer consumer1_q1 = session.createConsumer(queueName1);
+
+ // add another consumer (on separate session)
+ ClientSession session_two = csf.createSession();
+ ClientConsumer consumer2_q1 = session_two.createConsumer(queueName1);
+
+ //first(normal) invocation - ensure 2 consumers returned
+ //used to block thread, until the delay() has been called.
+ delayCalled = new CountDownLatch(1);
+
+ String consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);
+
+ JsonObject consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
+ JsonArray consumersArray = (JsonArray) consumersAsJsonObject.get("data");
+
+ Assert.assertEquals("number of consumers returned from query", 2, consumersArray.size());
+ Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
+ Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));
+
+ //second invocation - close session during listConsumers()
+
+ //used to block thread, until the delay() has been called.
+ delayCalled = new CountDownLatch(1);
+
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ //wait until the delay occurs and close the session.
+ delayCalled.await();
+ session.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ });
+
+ consumersAsJsonString = serverControl.listConsumers(createJsonFilter("", "", ""), 1, 10);
+
+ consumersAsJsonObject = JsonUtil.readJsonObject(consumersAsJsonString);
+ consumersArray = (JsonArray) consumersAsJsonObject.get("data");
+
+ // session is closed before Json string is created - should only be one consumer returned
+ Assert.assertEquals("number of consumers returned from query", 1, consumersArray.size());
+ Assert.assertEquals("check consumer's queue", queueName1.toString(), consumersArray.getJsonObject(0).getString("queueName"));
+ Assert.assertNotEquals("check session", "", consumersArray.getJsonObject(0).getString("sessionName"));
+
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
//notify delay has been called and wait for X seconds
public static void delay(int seconds) {
delayCalled.countDown();