You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:39 UTC

[helix] 30/37: Implement timeout for auto reconnect (#2409)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 632a3b7f759d1b8b199e1100b8cf362cb6eccd28
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Apr 4 13:13:46 2023 -0700

    Implement timeout for auto reconnect (#2409)
    
    Implement timeout for auto reconnect
    ZkClient does auto reconnect when it get's disconnected from Zookeeper. There is no timeout when reconnect can not be established. In ZkMetaclient, we have a separate thread to monitor reconnect status and close ZkClient when connection can not be reestablished when timed out.
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java     | 113 ++++++++++++-
 .../impl/zk/factory/ZkMetaClientConfig.java        |   1 -
 .../policy/ExponentialBackoffReconnectPolicy.java  |  20 ++-
 .../policy/MetaClientReconnectPolicy.java          |   3 +-
 .../metaclient/policy/NoRetryReconnectPolicy.java  |  36 -----
 .../zk/TestConnectStateChangeListenerAndRetry.java | 180 +++++++++++++++++++++
 .../helix/metaclient/impl/zk/TestZkMetaClient.java |  32 ----
 .../metaclient/impl/zk/ZkMetaClientTestBase.java   |   3 +-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |   1 +
 9 files changed, 304 insertions(+), 85 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 9614e53d6..84c329fe4 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -19,9 +19,15 @@ package org.apache.helix.metaclient.impl.zk;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.AsyncCallback;
 import org.apache.helix.metaclient.api.ChildChangeListener;
@@ -47,33 +53,47 @@ import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
 import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
+
 public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
   private final ZkClient _zkClient;
   private final long _initConnectionTimeout;
   private final long _reconnectTimeout;
 
+  // After ZkClient gets disconnected from ZK server, it keeps retrying connection until connection
+  // is re-established or ZkClient is closed. We need a separate thread to monitor ZkClient
+  // reconnect and close ZkClient if it not able to reconnect within user specified timeout.
+  private final ScheduledExecutorService _zkClientReconnectMonitor;
+  private ScheduledFuture<?> _reconnectMonitorFuture;
+  private ReconnectStateChangeListener _reconnectStateChangeListener;
+  // Lock all activities related to ZkClient connection
+  private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
+
+
   public ZkMetaClient(ZkMetaClientConfig config) {
     _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
     _reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
     // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should
-    // 1. Allow user to config max backoff interval (next PR)
-    // 2. Allow user to config reconnect policy (future PR)
+    // Allow user to config reconnect policy
     _zkClient = new ZkClient(
         new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
         (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
         config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
         config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false);
+    _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
+    _reconnectStateChangeListener = new ReconnectStateChangeListener();
   }
 
   @Override
@@ -266,18 +286,25 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void connect() {
-    // TODO: throws IllegalStateException when already connected
     try {
+      _zkClientConnectionMutex.lock();
       _zkClient.connect(_initConnectionTimeout, _zkClient);
+      // register _reconnectStateChangeListener as state change listener to react to ZkClient connect
+      // state change event. When ZkClient disconnected from ZK, it still auto reconnect until
+      // ZkClient is closed or connection re-established.
+      // We will need to close ZkClient when user set retry connection timeout.
+      _zkClient.subscribeStateChanges(_reconnectStateChangeListener);
     } catch (ZkException e) {
       throw translateZkExceptionToMetaclientException(e);
+    } finally {
+      _zkClientConnectionMutex.unlock();
     }
   }
 
   @Override
   public void disconnect() {
-    // TODO: This is a temp impl for test only. no proper interrupt handling and error handling.
-    _zkClient.close();
+    cleanUpAndClose(true, true);
+    _zkClientReconnectMonitor.shutdownNow();
   }
 
   @Override
@@ -394,4 +421,80 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   public T deserialize(byte[] bytes, String path) {
     return _zkClient.deserialize(bytes, path);
   }
+
+  /**
+   * A clean up method called when connect state change or MetaClient is closing.
+   * @param cancel If we want to cancel the reconnect monitor thread.
+   * @param close If we want to close ZkClient.
+   */
+  private void cleanUpAndClose(boolean cancel, boolean close) {
+    _zkClientConnectionMutex.lock();
+    try {
+      if (close && !_zkClient.isClosed()) {
+        _zkClient.close();
+        // TODO: need to unsubscribe all persist watcher from ZK
+        // Add this in ZkClient when persist watcher change is in
+        // Also need to manually send CLOSED state change to state
+        // change listener (in change adapter)
+        LOG.info("ZkClient is closed");
+      }
+
+      if (cancel && _reconnectMonitorFuture != null) {
+        _reconnectMonitorFuture.cancel(true);
+        LOG.info("ZkClient reconnect monitor thread is canceled");
+      }
+
+    } finally {
+      _zkClientConnectionMutex.unlock();
+    }
+  }
+
+  private class ReconnectStateChangeListener implements IZkStateListener {
+    // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+    // Cancel the monitor thread when connected.
+    @Override
+    public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+      if (state == Watcher.Event.KeeperState.Disconnected) {
+        // Expired. start a new event monitoring retry
+        _zkClientConnectionMutex.lockInterruptibly();
+        try {
+          if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled()
+              || _reconnectMonitorFuture.isDone()) {
+            _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
+              if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
+                cleanUpAndClose(false, true);
+              }
+            }, _reconnectTimeout, TimeUnit.MILLISECONDS);
+            LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}",
+                _reconnectTimeout);
+          }
+        } finally {
+          _zkClientConnectionMutex.unlock();
+        }
+      } else if (state == Watcher.Event.KeeperState.SyncConnected
+          || state == Watcher.Event.KeeperState.ConnectedReadOnly) {
+        cleanUpAndClose(true, false);
+        LOG.info("ZkClient is SyncConnected, reconnect monitor thread is canceled (if any)");
+      }
+    }
+
+    // Cancel the monitor thread when connected.
+    @Override
+    public void handleNewSession(String sessionId) throws Exception {
+      cleanUpAndClose(true, false);
+      LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
+    }
+
+    // Cancel the monitor thread and close ZkClient when connect error.
+    @Override
+    public void handleSessionEstablishmentError(Throwable error) throws Exception {
+      cleanUpAndClose(true, true);
+      LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
+    }
+  }
+
+  @VisibleForTesting
+  ZkClient getZkClient() {
+    return _zkClient;
+  }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
index 82c9bb20a..63d6ff07c 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
@@ -130,7 +130,6 @@ public class ZkMetaClientConfig extends MetaClientConfig {
     @Override
     public ZkMetaClientConfig build() {
       validate();
-
       return new ZkMetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis,
           _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth,
           MetaClientConfig.StoreType.ZOOKEEPER, _monitorType, _monitorKey, _monitorInstanceName,
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
index 81e0c44f7..7c5829e10 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
@@ -21,6 +21,7 @@ package org.apache.helix.metaclient.policy;
 
 import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
 
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_AUTO_RECONNECT_TIMEOUT_MS;
 import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
 import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
 
@@ -32,22 +33,25 @@ import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_
  */
 public class ExponentialBackoffReconnectPolicy implements MetaClientReconnectPolicy {
 
-  private final long _maxBackOffInterval;
-  private final long _initBackoffInterval;
+  private final long _autoReconnectTimeout;
 
   @Override
   public RetryPolicyName getPolicyName() {
     return RetryPolicyName.EXP_BACKOFF;
   }
 
-  public ExponentialBackoffReconnectPolicy() {
-    _initBackoffInterval = DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
-    _maxBackOffInterval = DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+  @Override
+  public long getAutoReconnectTimeout() {
+    return _autoReconnectTimeout;
   }
 
-  public ExponentialBackoffReconnectPolicy(long maxBackOffInterval, long initBackoffInterval) {
-    _maxBackOffInterval = maxBackOffInterval;
-    _initBackoffInterval = initBackoffInterval;
+  public ExponentialBackoffReconnectPolicy() {
+    _autoReconnectTimeout = DEFAULT_AUTO_RECONNECT_TIMEOUT_MS;
+  }
 
+  public ExponentialBackoffReconnectPolicy(long autoReconnectTimeout) {
+    _autoReconnectTimeout = autoReconnectTimeout;
   }
+
+  // TODO: Allow user to pass maxBackOffInterval and initBackoffInterval.
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
index be80fdd59..5ef56988e 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
@@ -30,14 +30,13 @@ import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_
 public interface MetaClientReconnectPolicy {
 
   enum RetryPolicyName {
-    NO_RETRY,
     EXP_BACKOFF,
     LINEAR_BACKOFF
   }
 
   RetryPolicyName getPolicyName();
 
-  default  long getAutoReconnectTimeout() {
+  default long getAutoReconnectTimeout() {
     return DEFAULT_AUTO_RECONNECT_TIMEOUT_MS;
   }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java
deleted file mode 100644
index f81273b3a..000000000
--- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.helix.metaclient.policy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
-
-
-/**
- * Policy to define client re-establish connection behavior when connection to underlying metadata
- * store is expired.
- * If this retry policy is passed to MetaClient, no auto retry connection will be issued when
- * connection lost or expired.
- */
-public class NoRetryReconnectPolicy implements MetaClientReconnectPolicy {
-  @Override
-  public RetryPolicyName getPolicyName() {
-    return RetryPolicyName.NO_RETRY;
-  }
-}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
new file mode 100644
index 000000000..36b9b2131
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
@@ -0,0 +1,180 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+
+
+public class TestConnectStateChangeListenerAndRetry  {
+  protected static final String ZK_ADDR = "localhost:2181";
+  protected static ZkServer _zkServer;
+
+  private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
+
+  /**
+   * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
+   * This need to be done in a separate thread to simulate ZkClient eventThread.
+   */
+  private static void simulateZkStateReconnected(ZkClient zkClient) throws InterruptedException {
+      WatchedEvent event =
+          new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected,
+              null);
+      zkClient.process(event);
+
+      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+      event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected,
+          null);
+      zkClient.process(event);
+  }
+
+  @BeforeSuite
+  public void prepare() {
+    System.out.println("START TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis()));
+    // start local zookeeper server
+    _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
+  }
+
+  @AfterSuite
+  public void cleanUp() {
+    System.out.println("END TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testConnectState() {
+    System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis()));
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) {
+      zkMetaClient.connect();
+      zkMetaClient.connect();
+      Assert.fail("The second connect should throw IllegalStateException");
+    } catch (Exception ex) {
+      Assert.assertTrue(ex instanceof IllegalStateException);
+      Assert.assertEquals(ex.getMessage(), "ZkClient is not in init state. connect() has already been called.");
+    }
+    System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis()));
+  }
+
+  // test mock zkclient event
+  @Test(dependsOnMethods = "testConnectState")
+  public void testReConnectSucceed() throws InterruptedException {
+    System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testReConnectSucceed at " + new Date(System.currentTimeMillis()));
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) {
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+
+      zkMetaClient.connect();
+      // We need a separate thread to simulate reconnect. In ZkClient there is assertion to check
+      // reconnect and and CRUDs are not in the same thread. (So one does not block another)
+      Executors.newSingleThreadExecutor().execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            simulateZkStateReconnected(zkMetaClient.getZkClient());
+          } catch (InterruptedException e) {
+           Assert.fail("Exception in simulateZkStateReconnected", e);
+          }
+          countDownLatch.countDown();
+        }
+      });
+      countDownLatch.await(5000, TimeUnit.SECONDS);
+      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_EXD);
+      // When ZK reconnect happens within timeout window, zkMetaClient should ba able to perform CRUD.
+      Assert.assertTrue(zkMetaClient.getZkClient().getConnection().getZookeeperState().isConnected());
+      zkMetaClient.create("/key", "value");
+      Assert.assertEquals(zkMetaClient.get("/key"), "value");
+    }
+    System.out.println("END TestConnectStateChangeListenerAndRetry.testReConnectSucceed at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test(dependsOnMethods = "testReConnectSucceed")
+  public void testConnectStateChangeListener() throws Exception {
+    System.out.println("START TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis()));
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) {
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      final MetaClientInterface.ConnectState[] connectState =
+          new MetaClientInterface.ConnectState[2];
+      ConnectStateChangeListener listener = new ConnectStateChangeListener() {
+        @Override
+        public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
+            MetaClientInterface.ConnectState currentState) throws Exception {
+          connectState[0] = prevState;
+          connectState[1] = currentState;
+          countDownLatch.countDown();
+        }
+
+        @Override
+        public void handleConnectionEstablishmentError(Throwable error) throws Exception {
+
+        }
+      };
+      Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener));
+      zkMetaClient.connect();
+      countDownLatch.await(5000, TimeUnit.SECONDS);
+      Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED);
+      Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED);
+
+      _zkServer.shutdown();
+      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_EXD);
+      Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.CONNECTED);
+      Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.DISCONNECTED);
+
+      try {
+        zkMetaClient.create("/key", "value");
+        Assert.fail("Create call after close should throw IllegalStateException");
+      } catch (Exception ex) {
+        Assert.assertTrue(ex.getCause() instanceof IllegalStateException);
+      }
+    }
+    System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis()));
+  }
+
+  static ZkMetaClient<String> createZkMetaClientReconnectTest() {
+    ZkMetaClientConfig config =
+        new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+            .setMetaClientReconnectPolicy(
+                new ExponentialBackoffReconnectPolicy(
+                     AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST))
+            .build();
+    return new ZkMetaClient<>(config);
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index cdb894b34..6eb624fb8 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -52,9 +52,6 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERS
 
 public class TestZkMetaClient extends ZkMetaClientTestBase{
 
-  private static final String ZK_ADDR = "localhost:2183";
-  private static final int DEFAULT_TIMEOUT_MS = 1000;
-  private static final String ENTRY_STRING_VALUE = "test-value";
   private static final String TRANSACTION_TEST_PARENT_PATH = "/transactionOpTestPath";
   private static final String TEST_INVALID_PATH = "/_invalid/a/b/c";
 
@@ -351,35 +348,6 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
     }
   }
 
-  @Test
-  public void testConnectStateChangeListener() throws Exception {
-    final String basePath = "/TestZkMetaClient_testConnectionStateChangeListener";
-    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
-      CountDownLatch countDownLatch = new CountDownLatch(1);
-      final MetaClientInterface.ConnectState[] connectState =
-          new MetaClientInterface.ConnectState[2];
-      ConnectStateChangeListener listener = new ConnectStateChangeListener() {
-        @Override
-        public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
-            MetaClientInterface.ConnectState currentState) throws Exception {
-          connectState[0] = prevState;
-          connectState[1] = currentState;
-          countDownLatch.countDown();
-        }
-
-        @Override
-        public void handleConnectionEstablishmentError(Throwable error) throws Exception {
-
-        }
-      };
-      Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener));
-      zkMetaClient.connect();
-      countDownLatch.await(5000, TimeUnit.SECONDS);
-      Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED);
-      Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED);
-    }
-  }
-
   /**
    * Transactional op calls zk.multi() with a set of ops (operations)
    * and the return values are converted into metaclient opResults.
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
index 51c655602..e00eb9e83 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
@@ -68,7 +69,7 @@ public abstract class ZkMetaClientTestBase {
     return new ZkMetaClient<>(config);
   }
 
-  protected static ZkServer startZkServer(final String zkAddress) {
+  public static ZkServer startZkServer(final String zkAddress) {
     String zkDir = zkAddress.replace(':', '_');
     final String logDir = "/tmp/" + zkDir + "/logs";
     final String dataDir = "/tmp/" + zkDir + "/dataDir";
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 51904ede6..f87edda2b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -2615,6 +2615,7 @@ public class ZkClient implements Watcher {
       }
       _eventThread.interrupt();
       _eventThread.join(2000);
+      // TODO: Closing _event thread here will miss final `CLOSE` state change.
       if (isManagingZkConnection()) {
         LOG.info("Closing zkclient uid:{}, zk:{}", _uid, ((ZkConnection) connection).getZookeeper());
         connection.close();