You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/06/03 18:06:16 UTC
[helix] 04/05: Add TTL and Container modes to BaseDataAccessor and its implementations (#2107)
This is an automated email from the ASF dual-hosted git repository.
nealsun pushed a commit to branch zookeeper-api-ttlcontainer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit d820d29f85f880f1c88951503bd802309eec6801
Author: Ramin Bashizade <ra...@linkedin.com>
AuthorDate: Tue May 24 10:08:20 2022 -0700
Add TTL and Container modes to BaseDataAccessor and its implementations (#2107)
This commit adds support for TTL and Container modes to BaseDataAccessor and its implementations by taking advantage of relevant API from ZkClient and its descendent classes.
---
.../java/org/apache/helix/BaseDataAccessor.java | 23 +++++
.../helix/manager/zk/ZkBaseDataAccessor.java | 66 +++++++++++--
.../helix/manager/zk/ZkCacheBaseDataAccessor.java | 19 +++-
.../helix/manager/zk/TestZkBaseDataAccessor.java | 109 +++++++++++++++++++++
.../apache/helix/mock/MockBaseDataAccessor.java | 11 +++
5 files changed, 217 insertions(+), 11 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index e556ccfed..9c639d817 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -43,6 +43,18 @@ public interface BaseDataAccessor<T> {
*/
boolean create(String path, T record, int options);
+ /**
+ * This will always attempt to create the znode, if it exists it will return false. Will
+ * create parents if they do not exist. For performance reasons, it may try to create
+ * child first and only if it fails it will try to create parents
+ * @param path path to the ZNode to create
+ * @param record the data to write to the ZNode
+ * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+ * @param ttl TTL of the node in milliseconds, if options supports it
+ * @return true if creation succeeded, false otherwise (e.g. if the ZNode exists)
+ */
+ boolean create(String path, T record, int options, long ttl);
+
/**
* This will always attempt to set the data on existing node. If the ZNode does not
* exist it will create it and all its parents ZNodes if necessary
@@ -95,6 +107,17 @@ public interface BaseDataAccessor<T> {
*/
boolean[] createChildren(List<String> paths, List<T> records, int options);
+ /**
+ * Use it when creating children under a parent node. This will use async api for better
+ * performance. If the child already exists it will return false.
+ * @param paths the paths to the children ZNodes
+ * @param records List of data to write to each of the path
+ * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+ * @param ttl TTL of the node in milliseconds, if options supports it
+ * @return For each child: true if creation succeeded, false otherwise (e.g. if the child exists)
+ */
+ boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl);
+
/**
* can set multiple children under a parent node. This will use async api for better
* performance. If this child does not exist it will create it.
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 4e40d413c..0c7a94f1b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -47,6 +47,7 @@ import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -249,7 +250,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
*/
@Override
public boolean create(String path, T record, int options) {
- AccessResult result = doCreate(path, record, options);
+ return create(path, record, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * sync create with TTL
+ */
+ @Override
+ public boolean create(String path, T record, int options, long ttl) {
+ AccessResult result = doCreate(path, record, options, ttl);
return result._retCode == RetCode.OK;
}
@@ -257,6 +266,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
* sync create
*/
public AccessResult doCreate(String path, T record, int options) {
+ return doCreate(path, record, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * sync create with TTL
+ */
+ public AccessResult doCreate(String path, T record, int options, long ttl) {
AccessResult result = new AccessResult();
CreateMode mode = AccessOption.getMode(options);
if (mode == null) {
@@ -269,7 +285,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
do {
retry = false;
try {
- _zkClient.create(path, record, mode);
+ _zkClient.create(path, record, mode, ttl);
result._pathCreated.add(path);
result._retCode = RetCode.OK;
@@ -278,7 +294,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
// this will happen if parent node does not exist
String parentPath = HelixUtil.getZkParentPath(path);
try {
- AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT);
+ AccessResult res;
+ if (mode.isTTL()) {
+ res = doCreate(parentPath, null, options, ttl);
+ } else if (mode.isContainer()) {
+ res = doCreate(parentPath, null, AccessOption.CONTAINER);
+ } else {
+ res = doCreate(parentPath, null, AccessOption.PERSISTENT);
+ }
result._pathCreated.addAll(res._pathCreated);
RetCode rc = res._retCode;
if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) {
@@ -720,6 +743,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
*/
ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
boolean[] needCreate, List<List<String>> pathsCreated, int options) {
+ return create(paths, records, needCreate, pathsCreated, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * async create with TTL. give up on error other than NONODE
+ */
+ ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
+ boolean[] needCreate, List<List<String>> pathsCreated, int options, long ttl) {
if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size()
|| (pathsCreated != null && pathsCreated.size() != paths.size())) {
throw new IllegalArgumentException(
@@ -747,7 +778,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
String path = paths.get(i);
T record = records == null ? null : records.get(i);
cbList[i] = new ZkAsyncCallbacks.CreateCallbackHandler();
- _zkClient.asyncCreate(path, record, mode, cbList[i]);
+ if (mode.isTTL()) {
+ _zkClient.asyncCreate(path, record, mode, ttl, cbList[i]);
+ } else {
+ _zkClient.asyncCreate(path, record, mode, cbList[i]);
+ }
}
List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
@@ -784,8 +819,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
if (failOnNoNode) {
boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
- ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList =
- create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+ ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList;
+ if (mode.isTTL()) {
+ parentCbList = create(parentPaths, null, needCreateParent, pathsCreated, options, ttl);
+ } else if (mode.isContainer()) {
+ parentCbList =
+ create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.CONTAINER);
+ } else {
+ parentCbList =
+ create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+ }
for (int i = 0; i < parentCbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i];
if (parentCb == null) {
@@ -812,6 +855,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
*/
@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options) {
+ return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
+ }
+
+ /**
+ * async create with TTL
+ * TODO: rename to create
+ */
+ @Override
+ public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
boolean[] success = new boolean[paths.size()];
CreateMode mode = AccessOption.getMode(options);
@@ -829,7 +881,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
try {
ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
- create(paths, records, needCreate, pathsCreated, options);
+ create(paths, records, needCreate, pathsCreated, options, ttl);
for (int i = 0; i < cbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index e16519958..6a635a5da 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -38,6 +38,7 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
@@ -225,6 +226,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
@Override
public boolean create(String path, T data, int options) {
+ return create(path, data, options, ZkClient.TTL_NOT_SET);
+ }
+
+ @Override
+ public boolean create(String path, T data, int options, long ttl) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
@@ -233,7 +239,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
try {
cache.lockWrite();
ZkBaseDataAccessor<T>.AccessResult result =
- _baseAccessor.doCreate(serverPath, data, options);
+ _baseAccessor.doCreate(serverPath, data, options, ttl);
boolean success = (result._retCode == RetCode.OK);
updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT);
@@ -245,7 +251,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
}
// no cache
- return _baseAccessor.create(serverPath, data, options);
+ return _baseAccessor.create(serverPath, data, options, ttl);
}
@Override
@@ -426,6 +432,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options) {
+ return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
+ }
+
+ @Override
+ public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
@@ -438,7 +449,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
List<List<String>> pathsCreatedList =
new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
ZkAsyncCallbacks.CreateCallbackHandler[] createCbList =
- _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
+ _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options, ttl);
boolean[] success = new boolean[size];
for (int i = 0; i < size; i++) {
@@ -456,7 +467,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
}
// no cache
- return _baseAccessor.createChildren(serverPaths, records, options);
+ return _baseAccessor.createChildren(serverPaths, records, options, ttl);
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index f473be806..0ce99d59e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -210,6 +210,69 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
+ @Test
+ public void testSyncCreateWithTTL() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String path = String.format("/%s/%s", _rootPath, "msg_0");
+ ZNRecord record = new ZNRecord("msg_0");
+ ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);
+
+ boolean success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL);
+ Assert.assertFalse(success);
+ long ttl = 1L;
+ success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
+ Assert.assertTrue(success);
+ ZNRecord getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getId(), "msg_0");
+
+ record.setSimpleField("key0", "value0");
+ success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
+ Assert.assertFalse(success, "Should fail since node already exists");
+ getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getSimpleFields().size(), 0);
+
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testSyncCreateContainer() {
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ String path = String.format("/%s/%s", _rootPath, "msg_0");
+ ZNRecord record = new ZNRecord("msg_0");
+ ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);
+
+ boolean success = accessor.create(path, record, AccessOption.CONTAINER);
+ Assert.assertTrue(success);
+ ZNRecord getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getId(), "msg_0");
+
+ record.setSimpleField("key0", "value0");
+ success = accessor.create(path, record, AccessOption.CONTAINER);
+ Assert.assertFalse(success, "Should fail since node already exists");
+ getRecord = _gZkClient.readData(path);
+ Assert.assertNotNull(getRecord);
+ Assert.assertEquals(getRecord.getSimpleFields().size(), 0);
+
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
@Test
public void testDefaultAccessorCreateCustomData() {
String className = TestHelper.getTestClassName();
@@ -513,6 +576,52 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
Assert.assertEquals(record.getId(), msgId, "Should get what we created");
}
+ // test async createChildren with TTL
+ System.setProperty("zookeeper.extendedTypesEnabled", "true");
+ records = new ArrayList<>();
+ paths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathBuilder.instanceMessage(root, "host_2", msgId));
+ records.add(new ZNRecord(msgId));
+ }
+ success = accessor.createChildren(paths, records, AccessOption.PERSISTENT_WITH_TTL, 1L);
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+ }
+
+ // test get what we created
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ String path = PropertyPathBuilder.instanceMessage(root, "host_2", msgId);
+ ZNRecord record = _gZkClient.readData(path);
+ Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+ }
+
+ // test async createChildren with Container mode
+ records = new ArrayList<>();
+ paths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ paths.add(PropertyPathBuilder.instanceMessage(root, "host_3", msgId));
+ records.add(new ZNRecord(msgId));
+ }
+ success = accessor.createChildren(paths, records, AccessOption.CONTAINER);
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+ }
+
+ // test get what we created
+ for (int i = 0; i < 10; i++) {
+ String msgId = "msg_" + i;
+ String path = PropertyPathBuilder.instanceMessage(root, "host_3", msgId);
+ ZNRecord record = _gZkClient.readData(path);
+ Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+ }
+ System.clearProperty("zookeeper.extendedTypesEnabled");
+
// test async setChildren
records = new ArrayList<>();
paths = new ArrayList<>();
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index e22fcc2d3..1567b98cc 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -67,6 +67,11 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
return set(path, record, options);
}
+ @Override
+ public boolean create(String path, ZNRecord record, int options, long ttl) {
+ return set(path, record, options);
+ }
+
@Override
public boolean set(String path, ZNRecord record, int options) {
ZNode zNode = _recordMap.get(path);
@@ -112,6 +117,12 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
return setChildren(paths, records, options);
}
+ @Override
+ public boolean[] createChildren(List<String> paths, List<ZNRecord> records,
+ int options, long ttl) {
+ return setChildren(paths, records, options);
+ }
+
@Override
public boolean[] setChildren(List<String> paths, List<ZNRecord> records, int options) {
boolean [] ret = new boolean[paths.size()];