You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/11/14 15:24:04 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2174 Broker reconnect cause OOM with scale down

ARTEMIS-2174 Broker reconnect cause OOM with scale down

When a node tries to reconnects to another node in a scale down cluster,
the reconnect request gets denied by the other node and keeps retrying,
which causes tasks in the ordered executor accumulate and eventually OOM.

The fix is to change the ActiveMQPacketHandler#handleCheckForFailover
to allow reconnect if the scale down node is the node itself.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6e89b22e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6e89b22e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6e89b22e

Branch: refs/heads/master
Commit: 6e89b22eaae8cd82852ae3d0a643bb3502cf994c
Parents: 256e7c8
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Nov 14 19:21:48 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 14 10:23:57 2018 -0500

----------------------------------------------------------------------
 .../core/impl/ActiveMQPacketHandler.java        |  2 +-
 .../core/server/cluster/ClusterController.java  |  4 +
 .../tests/integration/server/ScaleDownTest.java | 81 ++++++++++++++++++++
 3 files changed, 86 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e89b22e/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index e12ca0c..eecc187 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -120,7 +120,7 @@ public class ActiveMQPacketHandler implements ChannelHandler {
 
    private void handleCheckForFailover(CheckFailoverMessage failoverMessage) {
       String nodeID = failoverMessage.getNodeID();
-      boolean okToFailover = nodeID == null || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
+      boolean okToFailover = nodeID == null || server.getNodeID().toString().equals(nodeID) || !(server.getHAPolicy().canScaleDown() && !server.hasScaledDown(new SimpleString(nodeID)));
       channel1.send(new CheckFailoverReplyMessage(okToFailover));
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e89b22e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
index 15cf04e..86cd0df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java
@@ -311,6 +311,10 @@ public class ClusterController implements ActiveMQComponent {
       this.replicatedClusterName = new SimpleString(replicatedClusterName);
    }
 
+   public Map<SimpleString, ServerLocatorInternal> getLocators() {
+      return this.locators;
+   }
+
    /**
     * a handler for handling packets sent between the cluster.
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6e89b22e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
index 2a395d7..c986910 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java
@@ -16,9 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.integration.server;
 
+import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -29,13 +35,18 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
 import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
 import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.junit.Wait;
 import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.junit.Assert;
@@ -146,6 +157,76 @@ public class ScaleDownTest extends ClusterTestBase {
       removeConsumer(0);
    }
 
+
+   @Test
+   public void testScaleDownNodeReconnect() throws Exception {
+
+      try {
+         ClusterController controller = servers[0].getClusterManager().getClusterController();
+
+         Map<SimpleString, ServerLocatorInternal> locatorsMap = controller.getLocators();
+         Iterator<Map.Entry<SimpleString, ServerLocatorInternal>> iter = locatorsMap.entrySet().iterator();
+         assertTrue(iter.hasNext());
+         Map.Entry<SimpleString, ServerLocatorInternal> entry = iter.next();
+         ServerLocatorImpl locator = (ServerLocatorImpl) entry.getValue();
+
+         waitForClusterConnected(locator);
+
+         servers[1].stop();
+
+         servers[1].start();
+
+         //by this moment server0 is trying to reconnect to server1
+         //In normal case server1 will check if the reconnection's scaleDown
+         //server has been scaled down before granting the connection.
+         //but if the scaleDown is server1 itself, it should grant
+         //the connection without checking scaledown state against it.
+         //Otherwise the connection will never be estabilished, and more,
+         //the repetitive reconnect attempts will cause
+         //ClientSessionFactory's closeExecutor to be filled with
+         //tasks that keep growing until OOM.
+         checkClusterConnectionExecutorNotBlocking(locator);
+      } finally {
+         servers[1].stop();
+         servers[0].stop();
+      }
+   }
+
+   private void checkClusterConnectionExecutorNotBlocking(ServerLocatorImpl locator) throws NoSuchFieldException, IllegalAccessException {
+      Field factoriesField = locator.getClass().getDeclaredField("factories");
+      factoriesField.setAccessible(true);
+      Set factories = (Set) factoriesField.get(locator);
+      assertEquals(1, factories.size());
+
+      ClientSessionFactoryImpl factory = (ClientSessionFactoryImpl) factories.iterator().next();
+
+      Field executorField = factory.getClass().getDeclaredField("closeExecutor");
+      executorField.setAccessible(true);
+      Executor pool = (Executor) executorField.get(factory);
+      final CountDownLatch latch = new CountDownLatch(1);
+      pool.execute(()->
+         latch.countDown()
+      );
+      boolean result = false;
+      try {
+         result = latch.await(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+      }
+      assertTrue("executor got blocked.", result);
+   }
+
+   private void waitForClusterConnected(ServerLocatorImpl locator) throws Exception {
+
+      boolean result = Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return !locator.getTopology().isEmpty();
+         }
+      }, 5000);
+
+      assertTrue("topology should not be empty", result);
+   }
+
    @Test
    public void testStoreAndForward() throws Exception {
       final int TEST_SIZE = 50;