You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2022/05/11 20:35:04 UTC

[helix] branch zookeeper-api-ttlcontainer updated: Add TTL mode to async create API in ZkClient (#2082)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zookeeper-api-ttlcontainer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/zookeeper-api-ttlcontainer by this push:
     new 881f06e7a Add TTL mode to async create API in ZkClient (#2082)
881f06e7a is described below

commit 881f06e7a10b1cdf22dae2dd1c03f01b62128a0b
Author: Ramin Bashizade <ra...@linkedin.com>
AuthorDate: Wed May 11 13:35:00 2022 -0700

    Add TTL mode to async create API in ZkClient (#2082)
    
    This PR adds methods that support creating persistent nodes with TTL asynchronously to ZkClient.
---
 helix-core/helix-core-1.0.4-SNAPSHOT.ivy           |  2 +-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 25 +++++++++++++++++-----
 .../zkclient/callback/ZkAsyncCallbacks.java        |  8 ++++++-
 zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy     |  2 +-
 4 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/helix-core/helix-core-1.0.4-SNAPSHOT.ivy b/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
index e4fffbe07..80ed665a1 100755
--- a/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
+++ b/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
@@ -52,7 +52,7 @@ under the License.
     <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
         <artifact name="log4j-slf4j-impl" ext="jar"/>
     </dependency>
-    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
+    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/>
 		<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" conf="compile->compile(default);runtime->runtime(default);default->default"/>
     <dependency org="commons-io" name="commons-io" rev="2.11.0" conf="compile->compile(default);runtime->runtime(default);default->default"/>
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index be729205c..c6b74239b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -85,6 +85,7 @@ import org.slf4j.LoggerFactory;
 public class ZkClient implements Watcher {
   private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class);
 
+  public static final long TTL_NOT_SET = -1L;
   private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
 
   // If number of children exceeds this limit, getChildren() should not retry on connection loss.
@@ -1937,10 +1938,10 @@ public class ZkClient implements Watcher {
           new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
       return;
     }
-    doAsyncCreate(path, data, mode, startT, cb, parseExpectedSessionId(datat));
+    doAsyncCreate(path, data, mode, TTL_NOT_SET, startT, cb, parseExpectedSessionId(datat));
   }
 
-  private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
+  private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode, long ttl,
       final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, final String expectedSessionId) {
     try {
       retryUntilConnected(() -> {
@@ -1949,19 +1950,33 @@ public class ZkClient implements Watcher {
                 GZipCompressionUtil.isCompressed(data)) {
               @Override
               protected void doRetry() {
-                doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb, expectedSessionId);
+                doAsyncCreate(path, data, mode, ttl, System.currentTimeMillis(), cb, expectedSessionId);
               }
-            });
+            }, ttl);
         return null;
       });
     } catch (RuntimeException e) {
       // Process callback to release caller from waiting
       cb.processResult(KeeperException.Code.APIERROR.intValue(), path,
-          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null);
       throw e;
     }
   }
 
+  public void asyncCreate(final String path, Object datat, final CreateMode mode, long ttl,
+      final ZkAsyncCallbacks.CreateCallbackHandler cb) {
+    final long startT = System.currentTimeMillis();
+    final byte[] data;
+    try {
+      data = (datat == null ? null : serialize(datat, path));
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null);
+      return;
+    }
+    doAsyncCreate(path, data, mode, ttl, startT, cb, parseExpectedSessionId(datat));
+  }
+
   // Async Data Accessors
   public void asyncSetData(final String path, Object datat, final int version,
       final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 506d23481..72e2b95b7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
 import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -111,12 +112,17 @@ public class ZkAsyncCallbacks {
     }
   }
 
-  public static class CreateCallbackHandler extends DefaultCallback implements StringCallback {
+  public static class CreateCallbackHandler extends DefaultCallback implements StringCallback, Create2Callback {
     @Override
     public void processResult(int rc, String path, Object ctx, String name) {
       callback(rc, path, ctx);
     }
 
+    @Override
+    public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
+      callback(rc, path, ctx);
+    }
+
     @Override
     public void handle() {
       // TODO Auto-generated method stub
diff --git a/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy b/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
index 0e57930f0..ad604c962 100644
--- a/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
+++ b/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
@@ -43,7 +43,7 @@ under the License.
     <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
         <artifact name="log4j-slf4j-impl" ext="jar"/>
     </dependency>
-    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
+    <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/>
 		<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>