You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/07/16 20:52:53 UTC

[helix] branch master updated: Refine the ZkHelixManager.disconnect() method so it won't be blocked when the ZK connection breaks. (#1149)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b7c991  Refine the ZkHelixManager.disconnect() method so it won't be blocked when the ZK connection breaks. (#1149)
3b7c991 is described below

commit 3b7c991b32dd5d6ff76298f27ac9ed9e22bb2e83
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Thu Jul 16 13:52:46 2020 -0700

    Refine the ZkHelixManager.disconnect() method so it won't be blocked when the ZK connection breaks. (#1149)
    
    The issue was that if ZkHelixManager.disconnect() is called when the ZK connection is not synConnected, the disconnect() call will hang and never return. This usually blocks Helix users' code when they try to clean up and finalize after ZK connection becomes unrecoverable.
    This PR introduces a temporary thread to finish the cleanup work which relies on the ZK connection (or application code). This thread will be interrupted if ZK connection breaks before or during the disconnect process.
---
 .../apache/helix/manager/zk/ZKHelixManager.java    | 101 +++++++++++++++++----
 .../helix/integration/TestZkConnectionLost.java    |  32 ++++++-
 2 files changed, 115 insertions(+), 18 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 0b365d4..9bd174e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -29,10 +29,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.JMException;
 
 import com.google.common.collect.Sets;
-
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
@@ -90,6 +90,8 @@ import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ChainedPathZkSerializer;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
@@ -100,8 +102,6 @@ import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.helix.zookeeper.datamodel.serializer.ChainedPathZkSerializer;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 
 public class ZKHelixManager implements HelixManager, IZkStateListener {
   private static Logger LOG = LoggerFactory.getLogger(ZKHelixManager.class);
@@ -817,15 +817,12 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
        */
       _messagingService.getExecutor().shutdown();
 
-      // TODO reset user defined handlers only
-      // TODO Fix the issue that when connection disconnected, reset handlers will be blocked. -- JJ
-      // This is because reset logic contains ZK operations.
-      resetHandlers(true);
-
-      if (_leaderElectionHandler != null) {
-        _leaderElectionHandler.reset(true);
+      if (!cleanupCallbackHandlers()) {
+        LOG.warn(
+            "The callback handler cleanup has been cleanly done. "
+                + "Some callback handlers might not be reset properly. "
+                + "Continue to finish the other Helix Mananger disconnect tasks.");
       }
-
     } finally {
       GenericHelixController controller = _controller;
       if (controller != null) {
@@ -836,11 +833,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         }
       }
 
-      ParticipantManager participantManager = _participantManager;
-      if (participantManager != null) {
-        participantManager.disconnect();
-      }
-
       for (HelixCallbackMonitor callbackMonitor : _callbackMonitors.values()) {
         callbackMonitor.unregister();
       }
@@ -866,6 +858,83 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     }
   }
 
+  /**
+   * The callback handler cleanup operations that require an active ZkClient connection.
+   * If ZkClient is not connected, Helix Manager shall skip the cleanup.
+   *
+   * @return true if the cleanup has been done successfully.
+   */
+  private boolean cleanupCallbackHandlers() {
+    AtomicBoolean cleanupDone = new AtomicBoolean(false);
+
+    if (_zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS)) {
+      // Create a separate thread for executing cleanup task to avoid forever retry.
+      Thread cleanupThread = new Thread(String
+          .format("Cleanup thread for %s-%s-%s", _clusterName, _instanceName, _instanceType)) {
+        @Override
+        public void run() {
+          // TODO reset user defined handlers only
+          resetHandlers(true);
+
+          if (_leaderElectionHandler != null) {
+            _leaderElectionHandler.reset(true);
+          }
+
+          ParticipantManager participantManager = _participantManager;
+          if (participantManager != null) {
+            participantManager.disconnect();
+          }
+
+          cleanupDone.set(true);
+        }
+      };
+
+      // Define the state listener to terminate the cleanup thread when the ZkConnection breaks.
+      IZkStateListener stateListener = new IZkStateListener() {
+        @Override
+        public void handleStateChanged(KeeperState state) {
+          // If the connection breaks during the cleanup , then stop the cleanup thread.
+          if (state != KeeperState.SyncConnected) {
+            cleanupThread.interrupt();
+          }
+        }
+
+        @Override
+        public void handleNewSession(String sessionId) {
+          // nothing
+        }
+
+        @Override
+        public void handleSessionEstablishmentError(Throwable error) {
+          // nothing
+        }
+      };
+
+      cleanupThread.start();
+      try {
+        // Subscribe and check the connection status one more time to ensure the thread is running
+        // with an active ZkConnection.
+        _zkclient.subscribeStateChanges(stateListener);
+        if (!_zkclient.waitUntilConnected(0, TimeUnit.MILLISECONDS)) {
+          cleanupThread.interrupt();
+        }
+
+        try {
+          cleanupThread.join();
+        } catch (InterruptedException ex) {
+          cleanupThread.interrupt();
+        }
+      } finally {
+        _zkclient.unsubscribeStateChanges(stateListener);
+      }
+    } else {
+      LOG.warn(
+          "ZkClient is not connected to the Zookeeper. Skip the cleanup work that requires accessing Zookeeper.");
+    }
+
+    return cleanupDone.get();
+  }
+
   @Override
   public String getSessionId() {
     checkConnected(_waitForConnectedTimeout);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
index c790a45..740890e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkConnectionLost.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableMap;
@@ -39,8 +40,6 @@ import org.apache.helix.integration.task.TaskTestUtil;
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
-import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.TaskState;
@@ -48,6 +47,8 @@ import org.apache.helix.task.WorkflowContext;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +105,33 @@ public class TestZkConnectionLost extends TaskTestBase {
   }
 
   @Test
+  public void testDisconnectWhenConnectionBreak() throws Exception {
+    String controllerName = CONTROLLER_PREFIX + "_" + TestHelper.getTestMethodName();
+    ClusterControllerManager controllerManager =
+        new ClusterControllerManager(_zkAddr, CLUSTER_NAME, controllerName);
+    controllerManager.syncStart();
+
+    TestHelper.stopZkServer(_zkServerRef.get());
+    AtomicBoolean disconnected = new AtomicBoolean(false);
+    Thread testThread = new Thread("Testing HelixManager disconnect") {
+      @Override
+      public void run() {
+        controllerManager.disconnect();
+        disconnected.set(true);
+      }
+    };
+    try {
+      testThread.start();
+      testThread.join(10000);
+      Assert.assertTrue(disconnected.get());
+      Assert.assertFalse(controllerManager.isConnected());
+    } finally {
+      testThread.interrupt();
+      _zkServerRef.set(TestHelper.startZkServer(_zkAddr, null, false));
+    }
+  }
+
+  @Test
   public void testLostZkConnection() throws Exception {
     System.setProperty(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT, "1000");
     System.setProperty(SystemPropertyKeys.ZK_SESSION_TIMEOUT, "1000");