You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/12 20:48:32 UTC

[3/4] helix git commit: Make ZkClient keep retrying connect on expiring.

Make ZkClient keep retrying connect on expiring.

This is to prevent Zk reconnect failure due to transient network issue.
With this change in ZkClient, HelixManager retry is no longer needed. Deprecate the related option item and simplify handleSessionEstablishmentError logic.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/05583449
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/05583449
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/05583449

Branch: refs/heads/master
Commit: 0558344963115414cbec15c752d44b4eec3cc6c4
Parents: c145c7c
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue Jun 5 10:44:55 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Thu Jul 12 13:45:17 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/SystemPropertyKeys.java    |   1 +
 .../apache/helix/manager/zk/ZKHelixManager.java |  35 +--
 .../helix/manager/zk/zookeeper/ZkClient.java    |  39 ++-
 .../helix/util/ExponentialBackoffStrategy.java  |  33 +++
 .../helix/integration/TestZkReconnect.java      | 241 -------------------
 .../helix/manager/zk/TestZkReconnect.java       | 239 ++++++++++++++++++
 6 files changed, 310 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index 7af9635..aa8535b 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -15,6 +15,7 @@ public class SystemPropertyKeys {
 
   public static final String ZK_CONNECTION_TIMEOUT = "zk.connection.timeout";
 
+  @Deprecated
   public static final String ZK_REESTABLISHMENT_CONNECTION_TIMEOUT =
       "zk.connectionReEstablishment.timeout";
 

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4eb037a..5620ef6 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -68,7 +68,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   public static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec
   public static final int MAX_DISCONNECT_THRESHOLD = 5;
   public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
-  private static final int DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT = 120000; // Default to 120 sec
   private static final int DEFAULT_WAIT_CONNECTED_TIMEOUT = 10 * 1000;  // wait until connected for up to 10 seconds.
 
   protected final String _zkAddress;
@@ -78,7 +77,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private final int _waitForConnectedTimeout; // wait time for testing connect
   private final int _sessionTimeout; // client side session timeout, will be overridden by server timeout. Disconnect after timeout
   private final int _connectionInitTimeout; // client timeout to init connect
-  private final int _connectionRetryTimeout; // retry when connect being re-established
   private final List<PreConnectCallback> _preConnectCallbacks;
   protected final List<CallbackHandler> _handlers;
   private final HelixManagerProperties _properties;
@@ -233,10 +231,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
             ZkClient.DEFAULT_CONNECTION_TIMEOUT);
 
-    _connectionRetryTimeout = HelixUtil
-        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_REESTABLISHMENT_CONNECTION_TIMEOUT,
-            DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
-
     _waitForConnectedTimeout = HelixUtil
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
             DEFAULT_WAIT_CONNECTED_TIMEOUT);
@@ -1060,38 +1054,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
   @Override
   public void handleSessionEstablishmentError(Throwable error) throws Exception {
-    LOG.warn("Handling Session Establishment Error. Try to reset connection.", error);
+    LOG.warn("Handling Session Establishment Error. Disconnect Helix Manager.", error);
     // Cleanup ZKHelixManager
     if (_zkclient != null) {
       _zkclient.close();
     }
     disconnect();
-    // Try to establish connections
-    long operationStartTime = System.currentTimeMillis();
-    while (!isConnected()) {
-      try {
-        connect();
-        break;
-      } catch (Exception e) {
-        if (System.currentTimeMillis() - operationStartTime >= _connectionRetryTimeout) {
-          break;
-        }
-        // If retry fails, use the latest exception.
-        error = e;
-        LOG.error("Fail to reset connection after session establishment error happens. Will retry.", error);
-        // Yield until next retry.
-        Thread.yield();
-      }
-    }
 
-    if (!isConnected()) {
-      LOG.error("Fail to reset connection after session establishment error happens.", error);
-      // retry failed, trigger error handler
-      if (_stateListener != null) {
-        _stateListener.onDisconnected(this, error);
-      }
-    } else {
-      LOG.info("Connection is recovered.");
+    if (_stateListener != null) {
+      _stateListener.onDisconnected(this, error);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 0aa6587..0939826 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -45,6 +45,7 @@ import org.apache.helix.manager.zk.PathBasedZkSerializer;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
 import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
+import org.apache.helix.util.ExponentialBackoffStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ZkClient implements Watcher {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+  private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
 
   protected final IZkConnection _connection;
   protected final long operationRetryTimeoutInMillis;
@@ -783,16 +785,40 @@ public class ZkClient implements Watcher {
     }
     fireStateChangedEvent(event.getState());
     if (event.getState() == KeeperState.Expired) {
+      reconnectOnExpiring();
+    }
+  }
+
+  private void reconnectOnExpiring() {
+    int retryCount = 0;
+    ExponentialBackoffStrategy retryStrategy =
+        new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true);
+
+    Exception reconnectException = new ZkException("Shutdown triggered.");
+    while (!_closed) {
       try {
         reconnect();
         fireNewSessionEvents();
-      } catch (final Exception e) {
-        LOG.warn(
-            "Unable to re-establish connection. Notifying consumer of the following exception: ",
-            e);
-        fireSessionEstablishmentError(e);
+        return;
+      } catch (ZkInterruptedException interrupt) {
+        reconnectException = interrupt;
+        break;
+      } catch (Exception e) {
+        reconnectException = e;
+        long waitInterval = retryStrategy.getNextWaitInterval(retryCount++);
+        LOG.warn("ZkClient reconnect on expiring failed. Will retry after {} ms", waitInterval, e);
+        try {
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException ex) {
+          reconnectException = ex;
+          break;
+        }
       }
     }
+
+    LOG.info("Unable to re-establish connection. Notifying consumer of the following exception: ",
+        reconnectException);
+    fireSessionEstablishmentError(reconnectException);
   }
 
   private void fireNewSessionEvents() {
@@ -1437,6 +1463,9 @@ public class ZkClient implements Watcher {
    */
   public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
       throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
+    if (_closed) {
+      throw new IllegalStateException("ZkClient already closed!");
+    }
     boolean started = false;
     acquireEventLock();
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java b/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
new file mode 100644
index 0000000..b1a66c9
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/ExponentialBackoffStrategy.java
@@ -0,0 +1,33 @@
+package org.apache.helix.util;
+
+import java.util.Random;
+
+public class ExponentialBackoffStrategy {
+  private final long INIT_RETRY_INTERVAL = 500;
+  private final long _maxRetryInterval;
+  private final boolean _addJitter;
+  private final Random _ran;
+
+  public ExponentialBackoffStrategy(long maxRetryInterval, boolean addJitter) {
+    _maxRetryInterval = maxRetryInterval;
+    _addJitter = addJitter;
+    _ran = new Random(System.currentTimeMillis());
+  }
+
+  public long getNextWaitInterval(int numberOfTriesFailed) {
+    double exponentialMultiplier = Math.pow(2.0, numberOfTriesFailed - 1);
+    double result = exponentialMultiplier * INIT_RETRY_INTERVAL;
+
+    if (_maxRetryInterval > 0 && result > _maxRetryInterval) {
+      result = _maxRetryInterval;
+    }
+
+    if (_addJitter) {
+      // Adding jitter so the real result would be 75% to 100% of the original result.
+      // Don't directly add jitter here, since it may exceed the max retry interval setup
+      result = result * (0.75 + _ran.nextDouble() % 0.25);
+    }
+
+    return (long) result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
deleted file mode 100644
index aa23257..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ /dev/null
@@ -1,241 +0,0 @@
-package org.apache.helix.integration;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.I0Itec.zkclient.ZkServer;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.TestHelper;
-import org.apache.helix.manager.zk.HelixManagerStateListener;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestZkReconnect {
-  private static final Logger LOG = LoggerFactory.getLogger(TestZkReconnect.class);
-
-  @Test (enabled = false)
-  public void testZKReconnect() throws Exception {
-    final AtomicReference<ZkServer> zkServerRef = new AtomicReference<ZkServer>();
-    final int zkPort = TestHelper.getRandomPort();
-    final String zkAddr = String.format("localhost:%d", zkPort);
-    ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-    zkServerRef.set(zkServer);
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-
-    // Setup cluster
-    LOG.info("Setup clusters");
-    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
-    clusterSetup.addCluster(clusterName, true);
-
-    // Registers and starts controller
-    LOG.info("Starts controller");
-    HelixManager controller =
-        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr);
-    controller.connect();
-
-    // Registers and starts participant
-    LOG.info("Starts participant");
-    String hostname = "localhost";
-    String instanceId = String.format("%s_%d", hostname, 1);
-    clusterSetup.addInstanceToCluster(clusterName, instanceId);
-    HelixManager participant =
-        HelixManagerFactory.getZKHelixManager(clusterName, instanceId, InstanceType.PARTICIPANT,
-            zkAddr);
-    participant.connect();
-
-    LOG.info("Register state machine");
-    final CountDownLatch latch = new CountDownLatch(1);
-    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-        new StateModelFactory<StateModel>() {
-          @Override
-          public StateModel createNewStateModel(String resource, String stateUnitKey) {
-            return new SimpleStateModel(latch);
-          }
-        }, "test");
-
-    String resourceName = "test-resource";
-    LOG.info("Ideal state assignment");
-    HelixAdmin helixAdmin = participant.getClusterManagmentTool();
-    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline",
-        IdealState.RebalanceMode.CUSTOMIZED.toString());
-
-    IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resourceName);
-    idealState.setReplicas("1");
-    idealState.setStateModelFactoryName("test");
-    idealState.setPartitionState(resourceName + "_0", instanceId, "ONLINE");
-
-    LOG.info("Shutdown ZK server");
-    TestHelper.stopZkServer(zkServerRef.get());
-    Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
-
-      @Override
-      public void run() {
-        try {
-          LOG.info("Restart ZK server");
-          // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
-          zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
-    }, 2L, TimeUnit.SECONDS);
-
-    // future.get();
-
-    LOG.info("Before update ideal state");
-    helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);
-    LOG.info("After update ideal state");
-
-    LOG.info("Wait for OFFLINE->ONLINE state transition");
-    try {
-      Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
-
-      // wait until stable state
-      boolean result =
-          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkAddr,
-              clusterName));
-      Assert.assertTrue(result);
-
-    } finally {
-      participant.disconnect();
-      zkServerRef.get().shutdown();
-    }
-  }
-
-  @Test
-  public void testHelixManagerStateListenerCallback() throws Exception {
-    final int zkPort = TestHelper.getRandomPort();
-    final String zkAddr = String.format("localhost:%d", zkPort);
-    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
-
-    String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    final String clusterName = className + "_" + methodName;
-
-    // Init onDisconnectedFlag to check if callback is triggered
-    final AtomicReference<Boolean> onDisconnectedFlag = new AtomicReference<>(false);
-    final AtomicReference<Boolean> onConnectedFlag = new AtomicReference<>(false);
-
-    // Setup cluster
-    LOG.info("Setup clusters");
-    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
-    clusterSetup.addCluster(clusterName, true);
-    // For fast test, set short timeout
-    System.setProperty("zk.connection.timeout", "2000");
-    System.setProperty("zk.connectionReEstablishment.timeout", "1000");
-
-    // Registers and starts controller, register listener for disconnect handling
-    LOG.info("Starts controller");
-    final ZKHelixManager controller =
-        (ZKHelixManager) HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr,
-            new HelixManagerStateListener() {
-              @Override
-              public void onConnected(HelixManager helixManager) throws Exception {
-                Assert.assertEquals(helixManager.getClusterName(), clusterName);
-                onConnectedFlag.getAndSet(true);
-              }
-
-              @Override
-              public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception {
-                Assert.assertEquals(helixManager.getClusterName(), clusterName);
-                onDisconnectedFlag.getAndSet(true);
-              }
-            });
-
-    try {
-      controller.connect();
-      Assert.assertTrue(onConnectedFlag.getAndSet(false));
-      ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
-
-      // 1. shutdown zkServer and check if handler trigger callback
-      zkServer.shutdown();
-
-      // Retry will fail, and onDisconnectedFlag should be set within onDisconnected handler
-      controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertTrue(onDisconnectedFlag.get());
-      Assert.assertFalse(onConnectedFlag.get());
-      Assert.assertFalse(controller.isConnected());
-
-      // Verify ZK is down
-      try {
-        propertyStore.get("/", null, 0);
-        Assert.fail("propertyStore should be disconnected.");
-      } catch (IllegalStateException e) {
-        // Expected exception
-        System.out.println(e.getMessage());
-      }
-
-      // 2. restart zkServer and check if handler will recover connection
-      onDisconnectedFlag.getAndSet(false);
-      zkServer.start();
-
-      // Retry will succeed, and onDisconnectedFlag should not be set
-      controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Assert.assertFalse(onDisconnectedFlag.get());
-      Assert.assertTrue(onConnectedFlag.get());
-
-      // New propertyStore should be in good state
-      propertyStore = controller.getHelixPropertyStore();
-      propertyStore.get("/", null, 0);
-    } finally {
-      controller.disconnect();
-      zkServer.shutdown();
-      System.clearProperty("zk.connection.timeout");
-      System.clearProperty("zk.connectionReEstablishment.timeout");
-    }
-  }
-
-  public static final class SimpleStateModel extends StateModel {
-
-    private final CountDownLatch latch;
-
-    public SimpleStateModel(CountDownLatch latch) {
-      this.latch = latch;
-    }
-
-    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
-      // LOG.info(HelixUtils.toString(message));
-      LOG.info("message: " + message);
-      latch.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/05583449/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
new file mode 100644
index 0000000..e2d36b2
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -0,0 +1,239 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.helix.*;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkReconnect {
+  private static final Logger LOG = LoggerFactory.getLogger(TestZkReconnect.class);
+
+  @Test (enabled = false)
+  public void testZKReconnect() throws Exception {
+    final AtomicReference<ZkServer> zkServerRef = new AtomicReference<ZkServer>();
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+    zkServerRef.set(zkServer);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    // Setup cluster
+    LOG.info("Setup clusters");
+    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
+    clusterSetup.addCluster(clusterName, true);
+
+    // Registers and starts controller
+    LOG.info("Starts controller");
+    HelixManager controller =
+        HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr);
+    controller.connect();
+
+    // Registers and starts participant
+    LOG.info("Starts participant");
+    String hostname = "localhost";
+    String instanceId = String.format("%s_%d", hostname, 1);
+    clusterSetup.addInstanceToCluster(clusterName, instanceId);
+    HelixManager participant =
+        HelixManagerFactory.getZKHelixManager(clusterName, instanceId, InstanceType.PARTICIPANT,
+            zkAddr);
+    participant.connect();
+
+    LOG.info("Register state machine");
+    final CountDownLatch latch = new CountDownLatch(1);
+    participant.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
+        new StateModelFactory<StateModel>() {
+          @Override
+          public StateModel createNewStateModel(String resource, String stateUnitKey) {
+            return new SimpleStateModel(latch);
+          }
+        }, "test");
+
+    String resourceName = "test-resource";
+    LOG.info("Ideal state assignment");
+    HelixAdmin helixAdmin = participant.getClusterManagmentTool();
+    helixAdmin.addResource(clusterName, resourceName, 1, "OnlineOffline",
+        IdealState.RebalanceMode.CUSTOMIZED.toString());
+
+    IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resourceName);
+    idealState.setReplicas("1");
+    idealState.setStateModelFactoryName("test");
+    idealState.setPartitionState(resourceName + "_0", instanceId, "ONLINE");
+
+    LOG.info("Shutdown ZK server");
+    TestHelper.stopZkServer(zkServerRef.get());
+    Executors.newSingleThreadScheduledExecutor().schedule(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          LOG.info("Restart ZK server");
+          // zkServer.set(TestUtils.startZookeeper(zkDir, zkPort));
+          zkServerRef.set(TestHelper.startZkServer(zkAddr, null, false));
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+        }
+      }
+    }, 2L, TimeUnit.SECONDS);
+
+    // future.get();
+
+    LOG.info("Before update ideal state");
+    helixAdmin.setResourceIdealState(clusterName, resourceName, idealState);
+    LOG.info("After update ideal state");
+
+    LOG.info("Wait for OFFLINE->ONLINE state transition");
+    try {
+      Assert.assertTrue(latch.await(15, TimeUnit.SECONDS));
+
+      // wait until stable state
+      boolean result =
+          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkAddr,
+              clusterName));
+      Assert.assertTrue(result);
+
+    } finally {
+      participant.disconnect();
+      zkServerRef.get().shutdown();
+    }
+  }
+
+  @Test
+  public void testHelixManagerStateListenerCallback() throws Exception {
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    final String clusterName = className + "_" + methodName;
+
+    // Init onDisconnectedFlag to check if callback is triggered
+    final AtomicReference<Boolean> onDisconnectedFlag = new AtomicReference<>(false);
+    final AtomicReference<Boolean> onConnectedFlag = new AtomicReference<>(false);
+
+    // Setup cluster
+    LOG.info("Setup clusters");
+    ClusterSetup clusterSetup = new ClusterSetup(zkAddr);
+    clusterSetup.addCluster(clusterName, true);
+    // For fast test, set short timeout
+    System.setProperty("zk.connection.timeout", "2000");
+    System.setProperty("zk.connectionReEstablishment.timeout", "1000");
+
+    // Registers and starts controller, register listener for disconnect handling
+    LOG.info("Starts controller");
+    final ZKHelixManager controller =
+        (ZKHelixManager) HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.CONTROLLER, zkAddr,
+            new HelixManagerStateListener() {
+              @Override
+              public void onConnected(HelixManager helixManager) throws Exception {
+                Assert.assertEquals(helixManager.getClusterName(), clusterName);
+                onConnectedFlag.getAndSet(true);
+              }
+
+              @Override
+              public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception {
+                Assert.assertEquals(helixManager.getClusterName(), clusterName);
+                onDisconnectedFlag.getAndSet(true);
+              }
+            });
+
+    try {
+      controller.connect();
+      // check onConnected() is triggered
+      Assert.assertTrue(onConnectedFlag.getAndSet(false));
+
+      // 1. shutdown zkServer and check if handler trigger callback
+      zkServer.shutdown();
+      // Simulate a retry in ZkClient that will not succeed
+      WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
+      controller._zkclient.process(event);
+      Assert.assertFalse(controller._zkclient.waitUntilConnected(10000, TimeUnit.MILLISECONDS));
+      // While retrying, onDisconnectedFlag = false
+      Assert.assertFalse(onDisconnectedFlag.get());
+
+      // 2. restart zkServer and check if handler will recover connection
+      zkServer.start();
+      Assert.assertTrue(controller._zkclient
+          .waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+      Assert.assertTrue(controller.isConnected());
+
+      // New propertyStore should be in good state
+      ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
+      propertyStore.get("/", null, 0);
+
+      // Inject expire to test handler
+      // onDisconnectedFlag should be set within onDisconnected handler
+      controller.handleSessionEstablishmentError(new Exception("For testing"));
+      Assert.assertTrue(onDisconnectedFlag.get());
+      Assert.assertFalse(onConnectedFlag.get());
+      Assert.assertFalse(controller.isConnected());
+
+      // Verify ZK is down
+      try {
+        controller.getHelixPropertyStore();
+      } catch (HelixException e) {
+        // Expected exception
+        System.out.println(e.getMessage());
+      }
+    } finally {
+      controller.disconnect();
+      zkServer.shutdown();
+      System.clearProperty("zk.connection.timeout");
+      System.clearProperty("zk.connectionReEstablishment.timeout");
+    }
+  }
+
+  public static final class SimpleStateModel extends StateModel {
+
+    private final CountDownLatch latch;
+
+    public SimpleStateModel(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
+      // LOG.info(HelixUtils.toString(message));
+      LOG.info("message: " + message);
+      latch.countDown();
+    }
+  }
+}