You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/08/19 15:50:15 UTC

[activemq-artemis] branch master updated: ARTEMIS-2368 Fix races on closing consumer

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 448449c  ARTEMIS-2368 Fix races on closing consumer
     new 37fd9fe  This closes #2693
448449c is described below

commit 448449c88e33bf189482df0c3b722ced4f0a9d31
Author: yang wei <wy...@gmail.com>
AuthorDate: Tue Jun 4 15:04:20 2019 +0800

    ARTEMIS-2368 Fix races on closing consumer
---
 .../core/client/impl/ClientSessionImpl.java        |  21 ++-
 .../RaceOnClosingConsumerWhileReconnecting.java    | 187 +++++++++++++++++++++
 .../tests/integration/remoting/ReconnectTest.java  |  54 ++++++
 3 files changed, 257 insertions(+), 5 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 58b9eee..ec2dc79 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1377,7 +1377,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
                sessionContext.resetName(name);
 
-               for (ClientConsumerInternal consumer : cloneConsumers()) {
+               Map<ConsumerContext, ClientConsumerInternal> clonedConsumerEntries = cloneConsumerEntries();
+
+               for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {
                   consumer.clearAtFailover();
                }
 
@@ -1395,11 +1397,14 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
                if (!inClose && mayAttemptToFailover) {
                   sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);
 
-                  for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : consumers.entrySet()) {
+                  for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : clonedConsumerEntries.entrySet()) {
 
                      ClientConsumerInternal consumerInternal = entryx.getValue();
-
-                     sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);
+                     synchronized (consumerInternal) {
+                        if (!consumerInternal.isClosed()) {
+                           sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);
+                        }
+                     }
                   }
 
                   if ((!autoCommitAcks || !autoCommitSends) && workDone) {
@@ -1414,7 +1419,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
                   // Now start the session if it was already started
                   if (started) {
-                     for (ClientConsumerInternal consumer : cloneConsumers()) {
+                     for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {
                         consumer.clearAtFailover();
                         consumer.start();
                      }
@@ -2082,6 +2087,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       }
    }
 
+   public Map<ConsumerContext, ClientConsumerInternal> cloneConsumerEntries() {
+      synchronized (consumers) {
+         return new HashMap<>(consumers);
+      }
+   }
+
    private void closeChildren() throws ActiveMQException {
       Set<ClientConsumerInternal> consumersClone = cloneConsumers();
 
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnClosingConsumerWhileReconnecting.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnClosingConsumerWhileReconnecting.java
new file mode 100644
index 0000000..db27c0a
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnClosingConsumerWhileReconnecting.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class RaceOnClosingConsumerWhileReconnecting extends ActiveMQTestBase {
+   static RemotingConnection conn;
+
+   static ClientConsumer consumer;
+
+   protected ActiveMQServer server = null;
+
+   protected ClientSessionFactoryInternal sf = null;
+
+   protected ClientSessionInternal session = null;
+
+   protected final SimpleString queueName1 = new SimpleString("my_queue_one");
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      conn = null;
+      consumer = null;
+      server = createServer(true, true);
+      server.start();
+
+      SimpleString addressName1 = new SimpleString("my_address_one");
+
+      server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
+      server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, true, false);
+
+      final long retryInterval = 500;
+      final double retryMultiplier = 1d;
+      final int reconnectAttempts = 10;
+      ServerLocator locator = createFactory(true).setCallFailoverTimeout(0).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
+      sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
+      session = (ClientSessionInternal)sf.createSession(false, true, true);
+
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      if (session != null) {
+         session.close();
+      }
+      if (sf != null) {
+         sf.close();
+      }
+      if (server != null) {
+         server.stop();
+      }
+      conn = null;
+      consumer = null;
+      super.tearDown();
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "session.removeConsumer wait",
+         targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl",
+         targetMethod = "removeConsumer(org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal)",
+         targetLocation = "ENTRY",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.waitForReconnection();")})
+   public void testClosingConsumerBeforeReconnecting() throws Exception {
+      conn = session.getConnection();
+
+      ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
+      ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
+      clientConsumer1.close();
+
+      Thread.sleep(500);
+      Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
+      ServerConsumer serverConsumer = serverConsumers.iterator().next();
+      assertEquals(1, serverConsumers.size());
+      assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID());
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "session.closeConsumer before recreating consumer",
+         targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl",
+         targetMethod = "handleFailover",
+         targetLocation = "AFTER WRITE $consumerInternal 1",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.closeConsumer();")})
+   public void testClosingConsumerBeforeRecreatingOneConsumer() throws Exception {
+      RemotingConnection conn = session.getConnection();
+
+      ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
+      consumer = clientConsumer1;
+      conn.fail(new ActiveMQNotConnectedException());
+
+      Thread.sleep(500);
+      Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
+      assertEquals(0, serverConsumers.size());
+   }
+
+   @Test
+   @BMRules(
+      rules = {@BMRule(
+         name = "session.closeConsumer before recreating consumer",
+         targetClass = "org.apache.activemq.artemis.core.client.impl.ClientSessionImpl",
+         targetMethod = "handleFailover",
+         targetLocation = "AFTER WRITE $consumerInternal 1",
+         action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnClosingConsumerWhileReconnecting.closeConsumer();")})
+   public void testClosingConsumerBeforeRecreatingTwoConsumers() throws Exception {
+      RemotingConnection conn = session.getConnection();
+
+      ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
+      ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
+      consumer = clientConsumer1;
+      conn.fail(new ActiveMQNotConnectedException());
+
+      Thread.sleep(500);
+      ServerSession serverSession = server.getSessionByID(session.getName());
+      assertNotNull(serverSession);
+      Set<ServerConsumer> serverConsumers = serverSession.getServerConsumers();
+      ServerConsumer serverConsumer = serverConsumers.iterator().next();
+      assertEquals(1, serverConsumers.size());
+      assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID());
+   }
+
+   public static void closeConsumer() {
+      if (consumer != null) {
+         try {
+            consumer.close();
+         } catch (Exception e) {
+            e.printStackTrace();
+         } finally {
+            consumer = null;
+         }
+      }
+   }
+
+   public static void waitForReconnection() {
+      if (conn != null) {
+         try {
+            conn.fail(new ActiveMQNotConnectedException());
+         } catch (Exception e) {
+            e.printStackTrace();
+         } finally {
+            conn = null;
+         }
+      }
+   }
+}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
index c15b175..8c9ff19 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.remoting;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -26,6 +27,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
 import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
@@ -37,7 +41,9 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Assert;
@@ -370,6 +376,54 @@ public class ReconnectTest extends ActiveMQTestBase {
       server.stop();
    }
 
+   @Test
+   public void testClosingConsumerTimeout() throws Exception {
+      ActiveMQServer server = createServer(true, true);
+      server.start();
+
+      // imitate consumer close timeout
+      Interceptor reattachInterceptor = new Interceptor() {
+         boolean consumerClosed;
+
+         @Override
+         public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
+            if (!consumerClosed && packet.getType() == PacketImpl.SESS_CONSUMER_CLOSE) {
+               consumerClosed = true;
+               return false;
+            } else {
+               return true;
+            }
+
+         }
+      };
+      server.getRemotingService().addIncomingInterceptor(reattachInterceptor);
+
+      final long retryInterval = 500;
+      final double retryMultiplier = 1d;
+      final int reconnectAttempts = 10;
+      ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator);
+
+      ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true);
+      SimpleString queueName1 = new SimpleString("my_queue_one");
+      SimpleString addressName1 = new SimpleString("my_address_one");
+
+      server.addAddressInfo(new AddressInfo(addressName1, RoutingType.ANYCAST));
+      server.createQueue(addressName1, RoutingType.ANYCAST, queueName1, null, true, false);
+      ClientConsumer clientConsumer1 = session.createConsumer(queueName1);
+      ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
+      clientConsumer1.close();
+
+      Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers();
+      ServerConsumer serverConsumer = serverConsumers.iterator().next();
+      assertEquals(1, serverConsumers.size());
+      assertEquals(clientConsumer2.getConsumerContext().getId(), serverConsumer.getID());
+
+      session.close();
+      sf.close();
+      server.stop();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------