You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by "rahulrane50 (via GitHub)" <gi...@apache.org> on 2023/03/22 00:28:40 UTC

[GitHub] [helix] rahulrane50 commented on a diff in pull request #2409: Implement timeout for auto reconnect

rahulrane50 commented on code in PR #2409:
URL: https://github.com/apache/helix/pull/2409#discussion_r1144085583


##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -47,33 +52,41 @@
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
 import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
-public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
+
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable, IZkStateListener {

Review Comment:
   I think my comment is on what quincy is suggesting. This is confusing, from my understanding ZkMetaClient will implement MetaClientInterface which will expose APIs to register/un-register different types of listeners. If we implement this interface here and then again register this class itself as stateChangeListener then it will be confusing as well as may lead to circular dependencies. 



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -394,4 +414,69 @@ public byte[] serialize(T data, String path) {
   public T deserialize(byte[] bytes, String path) {
     return _zkClient.deserialize(bytes, path);
   }
+
+  /**
+   * A clean up method called when connect state change or MetaClient is closing.
+   * @param cancel If we want to cancel the reconnect monitor thread.
+   * @param close If we want to close ZkClient.
+   */
+  private void cleanUpAndClose(boolean cancel, boolean close) {
+    _zkClientConnectionMutex.lock();
+
+    if (close && !_zkClient.isClosed()) {
+      _zkClient.close();
+      LOG.info("ZkClient is closed");
+    }
+
+    if (cancel && _reconnectMonitorFuture != null) {
+      _reconnectMonitorFuture.cancel(true);
+      LOG.info("ZkClient reconnect monitor thread is canceled");
+    }
+
+    _zkClientConnectionMutex.unlock();
+  }
+
+  // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+  // Cancel the monitor thread when connected.
+  @Override
+  public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+    if (state == Watcher.Event.KeeperState.Disconnected) {
+      // Expired. start a new event monitoring retry
+      _zkClientConnectionMutex.lockInterruptibly();
+      if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled()
+          || _reconnectMonitorFuture.isDone()) {
+        _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
+          if (!_zkClient.getConnection().getZookeeperState().isConnected()) {

Review Comment:
   Should we check if _zkClient and connection objects are null?



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -266,18 +279,25 @@ public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback
 
   @Override
   public void connect() {
-    // TODO: throws IllegalStateException when already connected
     try {
+      _zkClientConnectionMutex.lock();
       _zkClient.connect(_initConnectionTimeout, _zkClient);
+      // register this client as state change listener to react to ZkClient EXPIRED event.
+      // When ZkClient has expired connection to ZK, it sill auto reconnect until ZkClient
+      // is closed or connection re-established.
+      // We will need to close ZkClient when user set retry connection timeout.
+      _zkClient.subscribeStateChanges(this);
     } catch (ZkException e) {
       throw translateZkExceptionToMetaclientException(e);
+    } finally {
+      _zkClientConnectionMutex.unlock();
     }
   }
 
   @Override
   public void disconnect() {
-    // TODO: This is a temp impl for test only. no proper interrupt handling and error handling.
-    _zkClient.close();
+    cleanUpAndClose(true, true);
+    _zkClientReconnectMonitor.shutdownNow();

Review Comment:
   Don't we want to take lock here? 



##########
meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java:
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+
+
+public class TestConnectStateChangeListenerAndRetry  {
+  protected static final String ZK_ADDR = "localhost:2181";
+  protected static ZkServer _zkServer;
+
+  private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;

Review Comment:
   Ideally it should be relative number w.r.t metaclientconfig retry policy numbers right? In case in future someone changes reconnect wait time or other params should we use those variables to test different scenarios?



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -394,4 +414,69 @@ public byte[] serialize(T data, String path) {
   public T deserialize(byte[] bytes, String path) {
     return _zkClient.deserialize(bytes, path);
   }
+
+  /**
+   * A clean up method called when connect state change or MetaClient is closing.
+   * @param cancel If we want to cancel the reconnect monitor thread.
+   * @param close If we want to close ZkClient.
+   */
+  private void cleanUpAndClose(boolean cancel, boolean close) {
+    _zkClientConnectionMutex.lock();
+
+    if (close && !_zkClient.isClosed()) {
+      _zkClient.close();
+      LOG.info("ZkClient is closed");
+    }
+
+    if (cancel && _reconnectMonitorFuture != null) {
+      _reconnectMonitorFuture.cancel(true);
+      LOG.info("ZkClient reconnect monitor thread is canceled");
+    }
+
+    _zkClientConnectionMutex.unlock();
+  }
+
+  // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+  // Cancel the monitor thread when connected.
+  @Override
+  public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+    if (state == Watcher.Event.KeeperState.Disconnected) {
+      // Expired. start a new event monitoring retry
+      _zkClientConnectionMutex.lockInterruptibly();

Review Comment:
   Ideally lock should protect one thing and only one resource. Here this is lock is protecting "_zkClient" as well as "_reconnectMonitorFuture" resources. Open to discuss how we simplify this or if there are any potential issues with this approach.



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -394,4 +414,69 @@ public byte[] serialize(T data, String path) {
   public T deserialize(byte[] bytes, String path) {
     return _zkClient.deserialize(bytes, path);
   }
+
+  /**
+   * A clean up method called when connect state change or MetaClient is closing.
+   * @param cancel If we want to cancel the reconnect monitor thread.
+   * @param close If we want to close ZkClient.
+   */
+  private void cleanUpAndClose(boolean cancel, boolean close) {
+    _zkClientConnectionMutex.lock();
+
+    if (close && !_zkClient.isClosed()) {
+      _zkClient.close();
+      LOG.info("ZkClient is closed");
+    }
+
+    if (cancel && _reconnectMonitorFuture != null) {
+      _reconnectMonitorFuture.cancel(true);
+      LOG.info("ZkClient reconnect monitor thread is canceled");
+    }
+
+    _zkClientConnectionMutex.unlock();
+  }
+
+  // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+  // Cancel the monitor thread when connected.
+  @Override
+  public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+    if (state == Watcher.Event.KeeperState.Disconnected) {
+      // Expired. start a new event monitoring retry
+      _zkClientConnectionMutex.lockInterruptibly();
+      if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled()
+          || _reconnectMonitorFuture.isDone()) {
+        _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
+          if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
+            cleanUpAndClose(false, true);
+          }
+        }, _reconnectTimeout, TimeUnit.MILLISECONDS);
+        LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}",
+            _reconnectTimeout);
+      }
+      _zkClientConnectionMutex.unlock();
+    } else if (state == Watcher.Event.KeeperState.SyncConnected
+        || state == Watcher.Event.KeeperState.ConnectedReadOnly) {
+      cleanUpAndClose(true, false);
+      LOG.info("ZkClient is SyncConnected, reconnect monitor thread is canceled (if any)");
+    }

Review Comment:
   I have usually seen this at other places as well but just wondering should we add any error logs or some logging for "else" block here?



##########
meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestConnectStateChangeListenerAndRetry.java:
##########
@@ -0,0 +1,150 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+
+
+public class TestConnectStateChangeListenerAndRetry  {
+  protected static final String ZK_ADDR = "localhost:2181";
+  protected static ZkServer _zkServer;
+
+  private static final long AUTO_RECONNECT_TIMEOUT_MS_FOR_TEST = 3 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_WITHIN = 1 * 1000;
+  private static final long AUTO_RECONNECT_WAIT_TIME_EXD = 5 * 1000;
+
+  /**
+   * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
+   */
+  private static void simulateZkStateReconnected(ZkClient zkClient) throws InterruptedException {
+      WatchedEvent event =
+          new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected,
+              null);
+      zkClient.process(event);
+
+      Thread.sleep(AUTO_RECONNECT_WAIT_TIME_WITHIN);
+
+      event = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.SyncConnected,
+          null);
+      zkClient.process(event);
+  }
+
+  @BeforeSuite
+  public void prepare() {
+    // start local zookeeper server
+    _zkServer = ZkMetaClientTestBase.startZkServer(ZK_ADDR);
+  }
+
+  @AfterSuite
+  public void cleanUp() {
+

Review Comment:
   Don't we want to close zk server here? Or another way to suggest is to implement ZkMetaClientTestBase class.



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -394,4 +414,69 @@ public byte[] serialize(T data, String path) {
   public T deserialize(byte[] bytes, String path) {
     return _zkClient.deserialize(bytes, path);
   }
+
+  /**
+   * A clean up method called when connect state change or MetaClient is closing.
+   * @param cancel If we want to cancel the reconnect monitor thread.
+   * @param close If we want to close ZkClient.
+   */
+  private void cleanUpAndClose(boolean cancel, boolean close) {
+    _zkClientConnectionMutex.lock();
+
+    if (close && !_zkClient.isClosed()) {
+      _zkClient.close();
+      LOG.info("ZkClient is closed");
+    }
+
+    if (cancel && _reconnectMonitorFuture != null) {
+      _reconnectMonitorFuture.cancel(true);
+      LOG.info("ZkClient reconnect monitor thread is canceled");
+    }
+
+    _zkClientConnectionMutex.unlock();
+  }
+
+  // Schedule a monitor to track ZkClient auto reconnect when Disconnected
+  // Cancel the monitor thread when connected.
+  @Override
+  public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+    if (state == Watcher.Event.KeeperState.Disconnected) {
+      // Expired. start a new event monitoring retry
+      _zkClientConnectionMutex.lockInterruptibly();
+      if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled()
+          || _reconnectMonitorFuture.isDone()) {
+        _reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
+          if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
+            cleanUpAndClose(false, true);
+          }
+        }, _reconnectTimeout, TimeUnit.MILLISECONDS);
+        LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}",
+            _reconnectTimeout);
+      }
+      _zkClientConnectionMutex.unlock();

Review Comment:
   Just make sure that "SyncConnected" else-if block is not in critical section.



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -47,33 +52,41 @@
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
 import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
-public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
+
+public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable, IZkStateListener {
   private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
   private final ZkClient _zkClient;
   private final long _initConnectionTimeout;
   private final long _reconnectTimeout;
 
+  private final ScheduledExecutorService _zkClientReconnectMonitor;
+  private ScheduledFuture<?> _reconnectMonitorFuture;
+  private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
+
   public ZkMetaClient(ZkMetaClientConfig config) {
-    _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
+      _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
     _reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
     // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should
-    // 1. Allow user to config max backoff interval (next PR)
-    // 2. Allow user to config reconnect policy (future PR)
+    // 1. Allow user to config max and init backoff interval
+    // 2. Allow user to config reconnect policy
     _zkClient = new ZkClient(
         new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
         (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
         config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
         config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false);
+    _zkClientReconnectMonitor = Executors.newScheduledThreadPool(1);

Review Comment:
   Not an expert but I see few suggestions/benefits of using newSingleThreadScheduledExecutor over newScheduledThreadPool with single thread. ([reference](https://stackoverflow.com/questions/30028825/difference-between-newscheduledthreadpool1-and-newsinglethreadscheduledexecuto#:~:text=The%20only%20difference%20between%20newSingleThreadScheduledExecutor,reconfigurable%20to%20use%20additional%20threads.&text=The%20difference%20is%20the%20DelegatedScheduledExecutorService%20exposes%20only%20schedule%20and%20invoke%20related%20methods.))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org