You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:39 UTC
[helix] 30/37: Implement timeout for auto reconnect (#2409)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 632a3b7f759d1b8b199e1100b8cf362cb6eccd28
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Apr 4 13:13:46 2023 -0700
Implement timeout for auto reconnect (#2409)
Implement timeout for auto reconnect
ZkClient does auto reconnect when it get's disconnected from Zookeeper. There is no timeout when reconnect can not be established. In ZkMetaclient, we have a separate thread to monitor reconnect status and close ZkClient when connection can not be reestablished when timed out.
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 113 ++++++++++++-
.../impl/zk/factory/ZkMetaClientConfig.java | 1 -
.../policy/ExponentialBackoffReconnectPolicy.java | 20 ++-
.../policy/MetaClientReconnectPolicy.java | 3 +-
.../metaclient/policy/NoRetryReconnectPolicy.java | 36 -----
.../zk/TestConnectStateChangeListenerAndRetry.java | 180 +++++++++++++++++++++
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 32 ----
.../metaclient/impl/zk/ZkMetaClientTestBase.java | 3 +-
.../apache/helix/zookeeper/zkclient/ZkClient.java | 1 +
9 files changed, 304 insertions(+), 85 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 9614e53d6..84c329fe4 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -19,9 +19,15 @@ package org.apache.helix.metaclient.impl.zk;
* under the License.
*/
+import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.AsyncCallback;
import org.apache.helix.metaclient.api.ChildChangeListener;
@@ -47,33 +53,47 @@ import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
+
public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
private final ZkClient _zkClient;
private final long _initConnectionTimeout;
private final long _reconnectTimeout;
+ // After ZkClient gets disconnected from ZK server, it keeps retrying connection until connection
+ // is re-established or ZkClient is closed. We need a separate thread to monitor ZkClient
+ // reconnect and close ZkClient if it not able to reconnect within user specified timeout.
+ private final ScheduledExecutorService _zkClientReconnectMonitor;
+ private ScheduledFuture<?> _reconnectMonitorFuture;
+ private ReconnectStateChangeListener _reconnectStateChangeListener;
+ // Lock all activities related to ZkClient connection
+ private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
+
+
public ZkMetaClient(ZkMetaClientConfig config) {
_initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
_reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
// TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should
- // 1. Allow user to config max backoff interval (next PR)
- // 2. Allow user to config reconnect policy (future PR)
+ // Allow user to config reconnect policy
_zkClient = new ZkClient(
new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
(int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false);
+ _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
+ _reconnectStateChangeListener = new ReconnectStateChangeListener();
}
@Override
@@ -266,18 +286,25 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
@Override
public void connect() {
- // TODO: throws IllegalStateException when already connected
try {
+ _zkClientConnectionMutex.lock();
_zkClient.connect(_initConnectionTimeout, _zkClient);
+ // register _reconnectStateChangeListener as state change listener to react to ZkClient connect
+ // state change event. When ZkClient disconnected from ZK, it still auto reconnect until
+ // ZkClient is closed or connection re-established.
+ // We will need to close ZkClient when user set retry connection timeout.
+ _zkClient.subscribeStateChanges(_reconnectStateChangeListener);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
+ } finally {
+ _zkClientConnectionMutex.unlock();
}
}
@Override
public void disconnect() {
- // TODO: This is a temp impl for test only. no proper interrupt handling and error handling.
- _zkClient.close();
+ cleanUpAndClose(true, true);
+ _zkClientReconnectMonitor.shutdownNow();
}
@Override
@@ -394,4 +421,80 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
public T deserialize(byte[] bytes, String path) {
return _zkClient.deserialize(bytes, path);
}
+
+ /**
+ * A clean up method called when connect state change or MetaClient is closing.
+ * @param cancel If we want to cancel the reconnect monitor thread.
+ * @param close If we want to close ZkClient.
+ */
+ private void cleanUpAndClose(boolean cancel, boolean close) {
+ _zkClientConnectionMutex.lock();
+ try {
+ if (close && !_zkClient.isClosed()) {
+ _zkClient.close();
+ // TODO: need to unsubscribe all persist watcher from ZK
+ // Add this in ZkClient when persist watcher change is in
+ // Also need to manually send CLOSED state change to state
+ // change listener (in change adapter)
+ LOG.info("ZkClient is closed");
+ }
+
+ if (cancel && _reconnectMonitorFuture != null) {
+ _reconnectMonitorFuture.cancel(true);
+ LOG.info("ZkClient reconnect monitor thread is canceled");
+ }
+
+ } finally {
+ _zkClientConnectionMutex.unlock();
+ }
+ }
+
+ private class ReconnectStateChangeListener implements IZkStateListener {
+ // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+ // Cancel the monitor thread when connected.
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+ if (state == Watcher.Event.KeeperState.Disconnected) {
+ // Expired. start a new event monitoring retry
+ _zkClientConnectionMutex.lockInterruptibly();
+ try {
+ if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled()
+ || _reconnectMonitorFuture.isDone()) {
+ _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
+ if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
+ cleanUpAndClose(false, true);
+ }
+ }, _reconnectTimeout, TimeUnit.MILLISECONDS);
+ LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}",
+ _reconnectTimeout);
+ }
+ } finally {
+ _zkClientConnectionMutex.unlock();
+ }
+ } else if (state == Watcher.Event.KeeperState.SyncConnected
+ || state == Watcher.Event.KeeperState.ConnectedReadOnly) {
+ cleanUpAndClose(true, false);
+ LOG.info("ZkClient is SyncConnected, reconnect monitor thread is canceled (if any)");
+ }
+ }
+
+ // Cancel the monitor thread when connected.
+ @Override
+ public void handleNewSession(String sessionId) throws Exception {
+ cleanUpAndClose(true, false);
+ LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
+ }
+
+ // Cancel the monitor thread and close ZkClient when connect error.
+ @Override
+ public void handleSessionEstablishmentError(Throwable error) throws Exception {
+ cleanUpAndClose(true, true);
+ LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
+ }
+ }
+
+ @VisibleForTesting
+ ZkClient getZkClient() {
+ return _zkClient;
+ }
}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
index 82c9bb20a..63d6ff07c 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
@@ -130,7 +130,6 @@ public class ZkMetaClientConfig extends MetaClientConfig {
@Override
public ZkMetaClientConfig build() {
validate();
-
return new ZkMetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis,
_sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth,
MetaClientConfig.StoreType.ZOOKEEPER, _monitorType, _monitorKey, _monitorInstanceName,
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
index 81e0c44f7..7c5829e10 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
@@ -21,6 +21,7 @@ package org.apache.helix.metaclient.policy;
import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_AUTO_RECONNECT_TIMEOUT_MS;
import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
@@ -32,22 +33,25 @@ import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_
*/
public class ExponentialBackoffReconnectPolicy implements MetaClientReconnectPolicy {
- private final long _maxBackOffInterval;
- private final long _initBackoffInterval;
+ private final long _autoReconnectTimeout;
@Override
public RetryPolicyName getPolicyName() {
return RetryPolicyName.EXP_BACKOFF;
}
- public ExponentialBackoffReconnectPolicy() {
- _initBackoffInterval = DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
- _maxBackOffInterval = DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+ @Override
+ public long getAutoReconnectTimeout() {
+ return _autoReconnectTimeout;
}
- public ExponentialBackoffReconnectPolicy(long maxBackOffInterval, long initBackoffInterval) {
- _maxBackOffInterval = maxBackOffInterval;
- _initBackoffInterval = initBackoffInterval;
+ public ExponentialBackoffReconnectPolicy() {
+ _autoReconnectTimeout = DEFAULT_AUTO_RECONNECT_TIMEOUT_MS;
+ }
+ public ExponentialBackoffReconnectPolicy(long autoReconnectTimeout) {
+ _autoReconnectTimeout = autoReconnectTimeout;
}
+
+ // TODO: Allow user to pass maxBackOffInterval and initBackoffInterval.
}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
index be80fdd59..5ef56988e 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
@@ -30,14 +30,13 @@ import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_
public interface MetaClientReconnectPolicy {
enum RetryPolicyName {
- NO_RETRY,
EXP_BACKOFF,
LINEAR_BACKOFF
}
RetryPolicyName getPolicyName();
- default long getAutoReconnectTimeout() {
+ default long getAutoReconnectTimeout() {
return DEFAULT_AUTO_RECONNECT_TIMEOUT_MS;
}
}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java
deleted file mode 100644
index f81273b3a..000000000
--- a/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package org.apache.helix.metaclient.policy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
-
-
-/**
- * Policy to define client re-establish connection behavior when connection to underlying metadata
- * store is expired.
- * If this retry policy is passed to MetaClient, no auto retry connection will be issued when
- * connection lost or expired.
- */
-public class NoRetryReconnectPolicy implements MetaClientReconnectPolicy {
- @Override
- public RetryPolicyName getPolicyName() {
- return RetryPolicyName.NO_RETRY;
- }
-}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
new file mode 100644
index 000000000..36b9b2131
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java
@@ -0,0 +1,180 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+
+
+public class TestConnectStateChangeListenerAndRetry {
+ protected static final String ZK_ADDR = "localhost:2181";
+ protected static ZkServer _zkServer;
+
+ private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+ private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
+ private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
+
+ /**
+ * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
+ * This need to be done in a separate thread to simulate ZkClient eventThread.
+ */
+ private static void simulateZkStateReconnected(ZkClient zkClient) throws InterruptedException {
+ WatchedEvent event =
+ new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected,
+ null);
+ zkClient.process(event);
+
+ Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+ event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected,
+ null);
+ zkClient.process(event);
+ }
+
+ @BeforeSuite
+ public void prepare() {
+ System.out.println("START TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis()));
+ // start local zookeeper server
+ _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
+ }
+
+ @AfterSuite
+ public void cleanUp() {
+ System.out.println("END TestConnectStateChangeListenerAndRetry at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testConnectState() {
+ System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis()));
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) {
+ zkMetaClient.connect();
+ zkMetaClient.connect();
+ Assert.fail("The second connect should throw IllegalStateException");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex instanceof IllegalStateException);
+ Assert.assertEquals(ex.getMessage(), "ZkClient is not in init state. connect() has already been called.");
+ }
+ System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectState at " + new Date(System.currentTimeMillis()));
+ }
+
+ // test mock zkclient event
+ @Test(dependsOnMethods = "testConnectState")
+ public void testReConnectSucceed() throws InterruptedException {
+ System.out.println("STARTING TestConnectStateChangeListenerAndRetry.testReConnectSucceed at " + new Date(System.currentTimeMillis()));
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ zkMetaClient.connect();
+ // We need a separate thread to simulate reconnect. In ZkClient there is assertion to check
+ // reconnect and and CRUDs are not in the same thread. (So one does not block another)
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ simulateZkStateReconnected(zkMetaClient.getZkClient());
+ } catch (InterruptedException e) {
+ Assert.fail("Exception in simulateZkStateReconnected", e);
+ }
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(5000, TimeUnit.SECONDS);
+ Thread.sleep(AUTO_RECONNECT_WAIT_TIME_EXD);
+ // When ZK reconnect happens within timeout window, zkMetaClient should ba able to perform CRUD.
+ Assert.assertTrue(zkMetaClient.getZkClient().getConnection().getZookeeperState().isConnected());
+ zkMetaClient.create("/key", "value");
+ Assert.assertEquals(zkMetaClient.get("/key"), "value");
+ }
+ System.out.println("END TestConnectStateChangeListenerAndRetry.testReConnectSucceed at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test(dependsOnMethods = "testReConnectSucceed")
+ public void testConnectStateChangeListener() throws Exception {
+ System.out.println("START TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis()));
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClientReconnectTest()) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ final MetaClientInterface.ConnectState[] connectState =
+ new MetaClientInterface.ConnectState[2];
+ ConnectStateChangeListener listener = new ConnectStateChangeListener() {
+ @Override
+ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
+ MetaClientInterface.ConnectState currentState) throws Exception {
+ connectState[0] = prevState;
+ connectState[1] = currentState;
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void handleConnectionEstablishmentError(Throwable error) throws Exception {
+
+ }
+ };
+ Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener));
+ zkMetaClient.connect();
+ countDownLatch.await(5000, TimeUnit.SECONDS);
+ Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED);
+ Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED);
+
+ _zkServer.shutdown();
+ Thread.sleep(AUTO_RECONNECT_WAIT_TIME_EXD);
+ Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.CONNECTED);
+ Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.DISCONNECTED);
+
+ try {
+ zkMetaClient.create("/key", "value");
+ Assert.fail("Create call after close should throw IllegalStateException");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex.getCause() instanceof IllegalStateException);
+ }
+ }
+ System.out.println("END TestConnectStateChangeListenerAndRetry.testConnectStateChangeListener at " + new Date(System.currentTimeMillis()));
+ }
+
+ static ZkMetaClient<String> createZkMetaClientReconnectTest() {
+ ZkMetaClientConfig config =
+ new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+ .setMetaClientReconnectPolicy(
+ new ExponentialBackoffReconnectPolicy(
+ AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST))
+ .build();
+ return new ZkMetaClient<>(config);
+ }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index cdb894b34..6eb624fb8 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -52,9 +52,6 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERS
public class TestZkMetaClient extends ZkMetaClientTestBase{
- private static final String ZK_ADDR = "localhost:2183";
- private static final int DEFAULT_TIMEOUT_MS = 1000;
- private static final String ENTRY_STRING_VALUE = "test-value";
private static final String TRANSACTION_TEST_PARENT_PATH = "/transactionOpTestPath";
private static final String TEST_INVALID_PATH = "/_invalid/a/b/c";
@@ -351,35 +348,6 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
}
}
- @Test
- public void testConnectStateChangeListener() throws Exception {
- final String basePath = "/TestZkMetaClient_testConnectionStateChangeListener";
- try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
- CountDownLatch countDownLatch = new CountDownLatch(1);
- final MetaClientInterface.ConnectState[] connectState =
- new MetaClientInterface.ConnectState[2];
- ConnectStateChangeListener listener = new ConnectStateChangeListener() {
- @Override
- public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
- MetaClientInterface.ConnectState currentState) throws Exception {
- connectState[0] = prevState;
- connectState[1] = currentState;
- countDownLatch.countDown();
- }
-
- @Override
- public void handleConnectionEstablishmentError(Throwable error) throws Exception {
-
- }
- };
- Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener));
- zkMetaClient.connect();
- countDownLatch.await(5000, TimeUnit.SECONDS);
- Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED);
- Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED);
- }
- }
-
/**
* Transactional op calls zk.multi() with a set of ops (operations)
* and the return values are converted into metaclient opResults.
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
index 51c655602..e00eb9e83 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.ZkServer;
@@ -68,7 +69,7 @@ public abstract class ZkMetaClientTestBase {
return new ZkMetaClient<>(config);
}
- protected static ZkServer startZkServer(final String zkAddress) {
+ public static ZkServer startZkServer(final String zkAddress) {
String zkDir = zkAddress.replace(':', '_');
final String logDir = "/tmp/" + zkDir + "/logs";
final String dataDir = "/tmp/" + zkDir + "/dataDir";
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 51904ede6..f87edda2b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -2615,6 +2615,7 @@ public class ZkClient implements Watcher {
}
_eventThread.interrupt();
_eventThread.join(2000);
+ // TODO: Closing _event thread here will miss final `CLOSE` state change.
if (isManagingZkConnection()) {
LOG.info("Closing zkclient uid:{}, zk:{}", _uid, ((ZkConnection) connection).getZookeeper());
connection.close();