You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2022/05/11 20:35:04 UTC
[helix] branch zookeeper-api-ttlcontainer updated: Add TTL mode to async create API in ZkClient (#2082)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zookeeper-api-ttlcontainer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/zookeeper-api-ttlcontainer by this push:
new 881f06e7a Add TTL mode to async create API in ZkClient (#2082)
881f06e7a is described below
commit 881f06e7a10b1cdf22dae2dd1c03f01b62128a0b
Author: Ramin Bashizade <ra...@linkedin.com>
AuthorDate: Wed May 11 13:35:00 2022 -0700
Add TTL mode to async create API in ZkClient (#2082)
This PR adds methods that support creating persistent nodes with TTL asynchronously to ZkClient.
---
helix-core/helix-core-1.0.4-SNAPSHOT.ivy | 2 +-
.../apache/helix/zookeeper/zkclient/ZkClient.java | 25 +++++++++++++++++-----
.../zkclient/callback/ZkAsyncCallbacks.java | 8 ++++++-
zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy | 2 +-
4 files changed, 29 insertions(+), 8 deletions(-)
diff --git a/helix-core/helix-core-1.0.4-SNAPSHOT.ivy b/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
index e4fffbe07..80ed665a1 100755
--- a/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
+++ b/helix-core/helix-core-1.0.4-SNAPSHOT.ivy
@@ -52,7 +52,7 @@ under the License.
<dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="log4j-slf4j-impl" ext="jar"/>
</dependency>
- <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
+ <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="commons-io" name="commons-io" rev="2.11.0" conf="compile->compile(default);runtime->runtime(default);default->default"/>
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index be729205c..c6b74239b 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -85,6 +85,7 @@ import org.slf4j.LoggerFactory;
public class ZkClient implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+ public static final long TTL_NOT_SET = -1L;
private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
// If number of children exceeds this limit, getChildren() should not retry on connection loss.
@@ -1937,10 +1938,10 @@ public class ZkClient implements Watcher {
new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
return;
}
- doAsyncCreate(path, data, mode, startT, cb, parseExpectedSessionId(datat));
+ doAsyncCreate(path, data, mode, TTL_NOT_SET, startT, cb, parseExpectedSessionId(datat));
}
- private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
+ private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode, long ttl,
final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, final String expectedSessionId) {
try {
retryUntilConnected(() -> {
@@ -1949,19 +1950,33 @@ public class ZkClient implements Watcher {
GZipCompressionUtil.isCompressed(data)) {
@Override
protected void doRetry() {
- doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb, expectedSessionId);
+ doAsyncCreate(path, data, mode, ttl, System.currentTimeMillis(), cb, expectedSessionId);
}
- });
+ }, ttl);
return null;
});
} catch (RuntimeException e) {
// Process callback to release caller from waiting
cb.processResult(KeeperException.Code.APIERROR.intValue(), path,
- new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
+ new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null);
throw e;
}
}
+ public void asyncCreate(final String path, Object datat, final CreateMode mode, long ttl,
+ final ZkAsyncCallbacks.CreateCallbackHandler cb) {
+ final long startT = System.currentTimeMillis();
+ final byte[] data;
+ try {
+ data = (datat == null ? null : serialize(datat, path));
+ } catch (ZkMarshallingError e) {
+ cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+ new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null);
+ return;
+ }
+ doAsyncCreate(path, data, mode, ttl, startT, cb, parseExpectedSessionId(datat));
+ }
+
// Async Data Accessors
public void asyncSetData(final String path, Object datat, final int version,
final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 506d23481..72e2b95b7 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.Create2Callback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -111,12 +112,17 @@ public class ZkAsyncCallbacks {
}
}
- public static class CreateCallbackHandler extends DefaultCallback implements StringCallback {
+ public static class CreateCallbackHandler extends DefaultCallback implements StringCallback, Create2Callback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
callback(rc, path, ctx);
}
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
+ callback(rc, path, ctx);
+ }
+
@Override
public void handle() {
// TODO Auto-generated method stub
diff --git a/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy b/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
index 0e57930f0..ad604c962 100644
--- a/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
+++ b/zookeeper-api/zookeeper-api-1.0.4-SNAPSHOT.ivy
@@ -43,7 +43,7 @@ under the License.
<dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="log4j-slf4j-impl" ext="jar"/>
</dependency>
- <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.4.13" conf="compile->compile(default);runtime->runtime(default);default->default"/>
+ <dependency org="org.apache.zookeeper" name="zookeeper" rev="3.5.9" conf="compile->compile(default);runtime->runtime(default);default->default"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-databind" rev="2.12.6.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="com.fasterxml.jackson.core" name="jackson-core" rev="2.12.6" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>