You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/03/11 06:26:08 UTC

[helix] branch metaclient updated: Add retry policy class and config for operation retry for MetaClient (#2385)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 2b636132b Add retry policy class and config for operation retry for MetaClient (#2385)
2b636132b is described below

commit 2b636132b8e357f56f10d7db346b165375dd03d8
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Mar 10 22:26:01 2023 -0800

    Add retry policy class and config for operation retry for MetaClient (#2385)
---
 .../metaclient/constants/MetaClientConstants.java  |  5 ++
 .../metaclient/factories/MetaClientConfig.java     | 71 ++++++++++++++++++----
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  5 +-
 .../impl/zk/factory/ZkMetaClientConfig.java        | 18 +++---
 .../policy/ExponentialBackoffReconnectPolicy.java  | 53 ++++++++++++++++
 .../MetaClientReconnectPolicy.java}                | 27 ++++----
 .../NoRetryReconnectPolicy.java}                   | 29 ++++-----
 7 files changed, 155 insertions(+), 53 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java
index e38b1f6a9..2c554189b 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java
@@ -36,6 +36,11 @@ public final class MetaClientConstants {
   // new session will be established.
   public static final int DEFAULT_SESSION_TIMEOUT_MS = 30 * 1000;
 
+  // Max backoff window for exponential reconnect back off policy. by default is 30 seconds.
+  public static final long DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS = 30 * 1000;
 
+  // Initial backoff window for exponential reconnect back off policy. by default is 500 ms.
+  public static final long DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS = 500;
 
+  //public static final long DEFAULT_MAX_LINEAR_BACKOFF_RETRY_WINDOW_MS = 5*1000;
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java
index 11f16e5b5..081727741 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/factories/MetaClientConfig.java
@@ -19,6 +19,8 @@ package org.apache.helix.metaclient.factories;
  * under the License.
  */
 
+import org.apache.helix.metaclient.policy.ExponentialBackoffReconnectPolicy;
+import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
 import org.apache.helix.metaclient.constants.MetaClientConstants;
 
 public class MetaClientConfig {
@@ -32,10 +34,18 @@ public class MetaClientConfig {
   // Wait for init timeout time until connection is initiated
   private final long _connectionInitTimeoutInMillis;
 
+  // Operation failed because of connection lost will be auto retried if connection has recovered
+  // within timeout time.
+  private final long _operationRetryTimeoutInMillis;
+
   // When a client becomes partitioned from the metadata service for more than session timeout,
   // new session will be established when reconnect.
   private final long _sessionTimeoutInMillis;
 
+  // Policy to define client re-establish connection behavior when the connection to underlying
+  // metadata store is expired.
+  private final MetaClientReconnectPolicy _metaClientReconnectPolicy;
+
   private final boolean _enableAuth;
   private final StoreType _storeType;
 
@@ -47,6 +57,10 @@ public class MetaClientConfig {
     return _connectionInitTimeoutInMillis;
   }
 
+  public long getOperationRetryTimeoutInMillis() {
+    return _operationRetryTimeoutInMillis;
+  }
+
   public boolean isAuthEnabled() {
     return _enableAuth;
   }
@@ -59,21 +73,22 @@ public class MetaClientConfig {
     return _sessionTimeoutInMillis;
   }
 
+  public MetaClientReconnectPolicy getMetaClientReconnectPolicy() {
+    return _metaClientReconnectPolicy;
+  }
+
   // TODO: More options to add later
   // private boolean _autoReRegistWatcher;  // re-register one time watcher when set to true
   // private boolean _resetWatchWhenReConnect; // re-register previous existing watcher when reconnect
-  //
-  //  public enum RetryProtocol {
-  //    NO_RETRY, EXP_BACK_OFF, CONST_RETRY_INTERVAL
-  //  }
-  //  private RetryProtocol _retryProtocol;
-
 
   protected MetaClientConfig(String connectionAddress, long connectionInitTimeoutInMillis,
-      long sessionTimeoutInMillis, boolean enableAuth, StoreType storeType) {
+      long operationRetryTimeoutInMillis, long sessionTimeoutInMillis,
+      MetaClientReconnectPolicy metaClientReconnectPolicy, boolean enableAuth, StoreType storeType) {
     _connectionAddress = connectionAddress;
     _connectionInitTimeoutInMillis = connectionInitTimeoutInMillis;
+    _operationRetryTimeoutInMillis = operationRetryTimeoutInMillis;
     _sessionTimeoutInMillis = sessionTimeoutInMillis;
+    _metaClientReconnectPolicy = metaClientReconnectPolicy;
     _enableAuth = enableAuth;
     _storeType = storeType;
   }
@@ -83,17 +98,16 @@ public class MetaClientConfig {
 
     protected long _connectionInitTimeoutInMillis;
     protected long _sessionTimeoutInMillis;
-    // protected long _operationRetryTimeout;
-    // protected RetryProtocol _retryProtocol;
+    protected long _operationRetryTimeout;
     protected boolean _enableAuth;
     protected StoreType _storeType;
+    protected MetaClientReconnectPolicy _metaClientReconnectPolicy;
 
 
     public MetaClientConfig build() {
       validate();
       return new MetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis,
-          _sessionTimeoutInMillis,
-          _enableAuth, _storeType);
+          _operationRetryTimeout, _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth, _storeType);
     }
 
     public MetaClientConfigBuilder() {
@@ -114,7 +128,7 @@ public class MetaClientConfig {
     }
 
     /**
-     * Set timeout in mm for connection initialization timeout
+     * Set timeout in ms for connection initialization timeout
      * @param timeout
      * @return
      */
@@ -123,6 +137,27 @@ public class MetaClientConfig {
       return self();
     }
 
+    /**
+     * Set timeout in ms for operation retry timeout
+     * @param timeout
+     * @return
+     */
+    public B setOperationRetryTimeoutInMillis(long timeout) {
+      _operationRetryTimeout = timeout;
+      return self();
+    }
+
+    /**
+     * Set reconnect policy when connection is lost or expired. By default is
+     * ExponentialBackoffReconnectPolicy
+     * @param reconnectPolicy an instance of type MetaClientReconnectPolicy
+     * @return
+     */
+    public B setMetaClientReconnectPolicy(MetaClientReconnectPolicy reconnectPolicy) {
+      _metaClientReconnectPolicy = reconnectPolicy;
+      return self();
+    }
+
     /**
      * Set timeout in mm for session timeout. When a client becomes partitioned from the metadata
      * service for more than session timeout, new session will be established.
@@ -145,6 +180,18 @@ public class MetaClientConfig {
     }
 
     protected void validate() {
+      if (_metaClientReconnectPolicy == null) {
+        _metaClientReconnectPolicy = new ExponentialBackoffReconnectPolicy();
+      }
+
+      // check if reconnect policy and retry policy conflict.
+      if (_metaClientReconnectPolicy.getPolicyName()
+          == MetaClientReconnectPolicy.RetryPolicyName.NO_RETRY && _operationRetryTimeout > 0) {
+        throw new IllegalArgumentException(
+            "MetaClientConfig.Builder: Incompatible operationRetryTimeout with NO_RETRY ReconnectPolicy.");
+      }
+      // TODO: check operationRetryTimeout should be less than ReconnectPolicy timeout.
+
       if (_storeType == null || _connectionAddress == null) {
         throw new IllegalArgumentException(
             "MetaClientConfig.Builder: store type or connection string is null");
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 7934fee8a..d3542e1fa 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -63,9 +63,12 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   public ZkMetaClient(ZkMetaClientConfig config) {
     _connectionTimeout = (int) config.getConnectionInitTimeoutInMillis();
+    // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should
+    // 1. Allow user to config max backoff interval (next PR)
+    // 2. Allow user to config reconnect policy (future PR)
     _zkClient = new ZkClient(
         new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
-        _connectionTimeout, -1 /*operationRetryTimeout*/, config.getZkSerializer(),
+        _connectionTimeout, config.getOperationRetryTimeoutInMillis(), config.getZkSerializer(),
         config.getMonitorType(), config.getMonitorKey(), config.getMonitorInstanceName(),
         config.getMonitorRootPathOnly(), false);
   }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
index d9292f846..3f21fa7df 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/factory/ZkMetaClientConfig.java
@@ -19,6 +19,7 @@ package org.apache.helix.metaclient.impl.zk.factory;
  * under the License.
  */
 
+import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
 import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
@@ -60,17 +61,17 @@ public class ZkMetaClientConfig extends MetaClientConfig {
   }
 
   protected ZkMetaClientConfig(String connectionAddress, long connectionInitTimeoutInMillis,
-      long sessionTimeoutInMillis, boolean enableAuth, StoreType storeType, String monitorType,
-      String monitorKey, String monitorInstanceName, boolean monitorRootPathOnly,
-      PathBasedZkSerializer zkSerializer) {
-    super(connectionAddress, connectionInitTimeoutInMillis, sessionTimeoutInMillis, enableAuth,
-        storeType);
+      long operationRetryTimeoutInMillis, long sessionTimeoutInMillis,
+      MetaClientReconnectPolicy reconnectPolicy, boolean enableAuth, StoreType storeType,
+      String monitorType, String monitorKey, String monitorInstanceName,
+      boolean monitorRootPathOnly, PathBasedZkSerializer zkSerializer) {
+    super(connectionAddress, connectionInitTimeoutInMillis, operationRetryTimeoutInMillis,
+        sessionTimeoutInMillis, reconnectPolicy, enableAuth, storeType);
     _zkSerializer = zkSerializer;
     _monitorType = monitorType;
     _monitorKey = monitorKey;
     _monitorInstanceName = monitorInstanceName;
     _monitorRootPathOnly = monitorRootPathOnly;
-
   }
 
   public static class ZkMetaClientConfigBuilder extends MetaClientConfig.MetaClientConfigBuilder<ZkMetaClientConfigBuilder> {
@@ -133,8 +134,9 @@ public class ZkMetaClientConfig extends MetaClientConfig {
         _zkSerializer = new BasicZkSerializer(new SerializableSerializer());
       }
       return new ZkMetaClientConfig(_connectionAddress, _connectionInitTimeoutInMillis,
-          _sessionTimeoutInMillis, _enableAuth, MetaClientConfig.StoreType.ZOOKEEPER, _monitorType,
-          _monitorKey, _monitorInstanceName, _monitorRootPathOnly, _zkSerializer);
+          _operationRetryTimeout, _sessionTimeoutInMillis, _metaClientReconnectPolicy, _enableAuth,
+          MetaClientConfig.StoreType.ZOOKEEPER, _monitorType, _monitorKey, _monitorInstanceName,
+          _monitorRootPathOnly, _zkSerializer);
     }
 
     @Override
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
new file mode 100644
index 000000000..81e0c44f7
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/ExponentialBackoffReconnectPolicy.java
@@ -0,0 +1,53 @@
+package org.apache.helix.metaclient.policy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
+
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+import static org.apache.helix.metaclient.constants.MetaClientConstants.DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+
+/**
+ * Policy to define client re-establish connection behavior when connection to underlying metadata
+ * store is expired.
+ * Wait time before each backoff period will increase exponentially until a user defined max
+ * backoff interval.
+ */
+public class ExponentialBackoffReconnectPolicy implements MetaClientReconnectPolicy {
+
+  private final long _maxBackOffInterval;
+  private final long _initBackoffInterval;
+
+  @Override
+  public RetryPolicyName getPolicyName() {
+    return RetryPolicyName.EXP_BACKOFF;
+  }
+
+  public ExponentialBackoffReconnectPolicy() {
+    _initBackoffInterval = DEFAULT_INIT_EXP_BACKOFF_RETRY_INTERVAL_MS;
+    _maxBackOffInterval = DEFAULT_MAX_EXP_BACKOFF_RETRY_INTERVAL_MS;
+  }
+
+  public ExponentialBackoffReconnectPolicy(long maxBackOffInterval, long initBackoffInterval) {
+    _maxBackOffInterval = maxBackOffInterval;
+    _initBackoffInterval = initBackoffInterval;
+
+  }
+}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
similarity index 55%
copy from meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java
copy to meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
index e38b1f6a9..3d4cba3de 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/MetaClientReconnectPolicy.java
@@ -1,4 +1,4 @@
-package org.apache.helix.metaclient.constants;
+package org.apache.helix.metaclient.policy;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,23 +19,20 @@ package org.apache.helix.metaclient.constants;
  * under the License.
  */
 
-public final class MetaClientConstants {
+/**
+ * Policy to define client re-establish connection behavior when connection to underlying metadata
+ * store is expired.
+ */
 
-  private MetaClientConstants(){
+public interface MetaClientReconnectPolicy {
 
+  enum RetryPolicyName {
+    NO_RETRY,
+    EXP_BACKOFF,
+    LINEAR_BACKOFF
   }
 
-  // Stop retrying when we reach timeout
-  //TODO The value should be the same as Helix default ZK retry time. Modify when change #2293 merged
-  public static final int DEFAULT_OPERATION_RETRY_TIMEOUT_MS = Integer.MAX_VALUE;
-
-  // maxMsToWaitUntilConnected
-  public static final int DEFAULT_CONNECTION_INIT_TIMEOUT_MS = 60 * 1000;
-
-  // When a client becomes partitioned from the metadata service for more than session timeout,
-  // new session will be established.
-  public static final int DEFAULT_SESSION_TIMEOUT_MS = 30 * 1000;
-
-
+  RetryPolicyName getPolicyName();
 
+  // TODO: add reconnect timeout
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java b/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java
similarity index 55%
copy from meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java
copy to meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java
index e38b1f6a9..f81273b3a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/constants/MetaClientConstants.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/policy/NoRetryReconnectPolicy.java
@@ -1,4 +1,4 @@
-package org.apache.helix.metaclient.constants;
+package org.apache.helix.metaclient.policy;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,23 +19,18 @@ package org.apache.helix.metaclient.constants;
  * under the License.
  */
 
-public final class MetaClientConstants {
+import org.apache.helix.metaclient.policy.MetaClientReconnectPolicy;
 
-  private MetaClientConstants(){
 
+/**
+ * Policy to define client re-establish connection behavior when connection to underlying metadata
+ * store is expired.
+ * If this retry policy is passed to MetaClient, no auto retry connection will be issued when
+ * connection lost or expired.
+ */
+public class NoRetryReconnectPolicy implements MetaClientReconnectPolicy {
+  @Override
+  public RetryPolicyName getPolicyName() {
+    return RetryPolicyName.NO_RETRY;
   }
-
-  // Stop retrying when we reach timeout
-  //TODO The value should be the same as Helix default ZK retry time. Modify when change #2293 merged
-  public static final int DEFAULT_OPERATION_RETRY_TIMEOUT_MS = Integer.MAX_VALUE;
-
-  // maxMsToWaitUntilConnected
-  public static final int DEFAULT_CONNECTION_INIT_TIMEOUT_MS = 60 * 1000;
-
-  // When a client becomes partitioned from the metadata service for more than session timeout,
-  // new session will be established.
-  public static final int DEFAULT_SESSION_TIMEOUT_MS = 30 * 1000;
-
-
-
 }