You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/06/03 18:06:16 UTC

[helix] 04/05: Add TTL and Container modes to BaseDataAccessor and its implementations (#2107)

This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch zookeeper-api-ttlcontainer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d820d29f85f880f1c88951503bd802309eec6801
Author: Ramin Bashizade <ra...@linkedin.com>
AuthorDate: Tue May 24 10:08:20 2022 -0700

    Add TTL and Container modes to BaseDataAccessor and its implementations (#2107)
    
    This commit adds support for TTL and Container modes to BaseDataAccessor and its implementations by taking advantage of relevant API from ZkClient and its descendent classes.
---
 .../java/org/apache/helix/BaseDataAccessor.java    |  23 +++++
 .../helix/manager/zk/ZkBaseDataAccessor.java       |  66 +++++++++++--
 .../helix/manager/zk/ZkCacheBaseDataAccessor.java  |  19 +++-
 .../helix/manager/zk/TestZkBaseDataAccessor.java   | 109 +++++++++++++++++++++
 .../apache/helix/mock/MockBaseDataAccessor.java    |  11 +++
 5 files changed, 217 insertions(+), 11 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index e556ccfed..9c639d817 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -43,6 +43,18 @@ public interface BaseDataAccessor<T> {
    */
   boolean create(String path, T record, int options);
 
+  /**
+   * This will always attempt to create the znode, if it exists it will return false. Will
+   * create parents if they do not exist. For performance reasons, it may try to create
+   * child first and only if it fails it will try to create parents
+   * @param path path to the ZNode to create
+   * @param record the data to write to the ZNode
+   * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+   * @param ttl TTL of the node in milliseconds, if options supports it
+   * @return true if creation succeeded, false otherwise (e.g. if the ZNode exists)
+   */
+  boolean create(String path, T record, int options, long ttl);
+
   /**
    * This will always attempt to set the data on existing node. If the ZNode does not
    * exist it will create it and all its parents ZNodes if necessary
@@ -95,6 +107,17 @@ public interface BaseDataAccessor<T> {
    */
   boolean[] createChildren(List<String> paths, List<T> records, int options);
 
+  /**
+   * Use it when creating children under a parent node. This will use async api for better
+   * performance. If the child already exists it will return false.
+   * @param paths the paths to the children ZNodes
+   * @param records List of data to write to each of the path
+   * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+   * @param ttl TTL of the node in milliseconds, if options supports it
+   * @return For each child: true if creation succeeded, false otherwise (e.g. if the child exists)
+   */
+  boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl);
+
   /**
    * can set multiple children under a parent node. This will use async api for better
    * performance. If this child does not exist it will create it.
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 4e40d413c..0c7a94f1b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -47,6 +47,7 @@ import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -249,7 +250,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    */
   @Override
   public boolean create(String path, T record, int options) {
-    AccessResult result = doCreate(path, record, options);
+    return create(path, record, options, ZkClient.TTL_NOT_SET);
+  }
+
+  /**
+   * sync create with TTL
+   */
+  @Override
+  public boolean create(String path, T record, int options, long ttl) {
+    AccessResult result = doCreate(path, record, options, ttl);
     return result._retCode == RetCode.OK;
   }
 
@@ -257,6 +266,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    * sync create
    */
   public AccessResult doCreate(String path, T record, int options) {
+    return doCreate(path, record, options, ZkClient.TTL_NOT_SET);
+  }
+
+  /**
+   * sync create with TTL
+   */
+  public AccessResult doCreate(String path, T record, int options, long ttl) {
     AccessResult result = new AccessResult();
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
@@ -269,7 +285,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     do {
       retry = false;
       try {
-        _zkClient.create(path, record, mode);
+        _zkClient.create(path, record, mode, ttl);
         result._pathCreated.add(path);
 
         result._retCode = RetCode.OK;
@@ -278,7 +294,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         // this will happen if parent node does not exist
         String parentPath = HelixUtil.getZkParentPath(path);
         try {
-          AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT);
+          AccessResult res;
+          if (mode.isTTL()) {
+            res = doCreate(parentPath, null, options, ttl);
+          }  else if (mode.isContainer()) {
+            res = doCreate(parentPath, null, AccessOption.CONTAINER);
+          } else {
+            res = doCreate(parentPath, null, AccessOption.PERSISTENT);
+          }
           result._pathCreated.addAll(res._pathCreated);
           RetCode rc = res._retCode;
           if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) {
@@ -720,6 +743,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    */
   ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
       boolean[] needCreate, List<List<String>> pathsCreated, int options) {
+    return create(paths, records, needCreate, pathsCreated, options, ZkClient.TTL_NOT_SET);
+  }
+
+  /**
+   * async create with TTL. give up on error other than NONODE
+   */
+  ZkAsyncCallbacks.CreateCallbackHandler[] create(List<String> paths, List<T> records,
+      boolean[] needCreate, List<List<String>> pathsCreated, int options, long ttl) {
     if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size()
         || (pathsCreated != null && pathsCreated.size() != paths.size())) {
       throw new IllegalArgumentException(
@@ -747,7 +778,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         String path = paths.get(i);
         T record = records == null ? null : records.get(i);
         cbList[i] = new ZkAsyncCallbacks.CreateCallbackHandler();
-        _zkClient.asyncCreate(path, record, mode, cbList[i]);
+        if (mode.isTTL()) {
+          _zkClient.asyncCreate(path, record, mode, ttl, cbList[i]);
+        } else {
+          _zkClient.asyncCreate(path, record, mode, cbList[i]);
+        }
       }
 
       List<String> parentPaths = new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
@@ -784,8 +819,16 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       if (failOnNoNode) {
         boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
 
-        ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList =
-            create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+        ZkAsyncCallbacks.CreateCallbackHandler[] parentCbList;
+        if (mode.isTTL()) {
+          parentCbList = create(parentPaths, null, needCreateParent, pathsCreated, options, ttl);
+        } else if (mode.isContainer()) {
+          parentCbList =
+              create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.CONTAINER);
+        } else {
+          parentCbList =
+              create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
+        }
         for (int i = 0; i < parentCbList.length; i++) {
           ZkAsyncCallbacks.CreateCallbackHandler parentCb = parentCbList[i];
           if (parentCb == null) {
@@ -812,6 +855,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    */
   @Override
   public boolean[] createChildren(List<String> paths, List<T> records, int options) {
+    return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
+  }
+
+  /**
+   * async create with TTL
+   * TODO: rename to create
+   */
+  @Override
+  public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
     boolean[] success = new boolean[paths.size()];
 
     CreateMode mode = AccessOption.getMode(options);
@@ -829,7 +881,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     try {
 
       ZkAsyncCallbacks.CreateCallbackHandler[] cbList =
-          create(paths, records, needCreate, pathsCreated, options);
+          create(paths, records, needCreate, pathsCreated, options, ttl);
 
       for (int i = 0; i < cbList.length; i++) {
         ZkAsyncCallbacks.CreateCallbackHandler cb = cbList[i];
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index e16519958..6a635a5da 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -38,6 +38,7 @@ import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
@@ -225,6 +226,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
 
   @Override
   public boolean create(String path, T data, int options) {
+    return create(path, data, options, ZkClient.TTL_NOT_SET);
+  }
+
+  @Override
+  public boolean create(String path, T data, int options, long ttl) {
     String clientPath = path;
     String serverPath = prependChroot(clientPath);
 
@@ -233,7 +239,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       try {
         cache.lockWrite();
         ZkBaseDataAccessor<T>.AccessResult result =
-            _baseAccessor.doCreate(serverPath, data, options);
+            _baseAccessor.doCreate(serverPath, data, options, ttl);
         boolean success = (result._retCode == RetCode.OK);
 
         updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT);
@@ -245,7 +251,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     }
 
     // no cache
-    return _baseAccessor.create(serverPath, data, options);
+    return _baseAccessor.create(serverPath, data, options, ttl);
   }
 
   @Override
@@ -426,6 +432,11 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
 
   @Override
   public boolean[] createChildren(List<String> paths, List<T> records, int options) {
+    return createChildren(paths, records, options, ZkClient.TTL_NOT_SET);
+  }
+
+  @Override
+  public boolean[] createChildren(List<String> paths, List<T> records, int options, long ttl) {
     final int size = paths.size();
     List<String> serverPaths = prependChroot(paths);
 
@@ -438,7 +449,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         List<List<String>> pathsCreatedList =
             new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
         ZkAsyncCallbacks.CreateCallbackHandler[] createCbList =
-            _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
+            _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options, ttl);
 
         boolean[] success = new boolean[size];
         for (int i = 0; i < size; i++) {
@@ -456,7 +467,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     }
 
     // no cache
-    return _baseAccessor.createChildren(serverPaths, records, options);
+    return _baseAccessor.createChildren(serverPaths, records, options, ttl);
   }
 
   @Override
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
index f473be806..0ce99d59e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java
@@ -210,6 +210,69 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
     System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  @Test
+  public void testSyncCreateWithTTL() {
+    System.setProperty("zookeeper.extendedTypesEnabled", "true");
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    String path = String.format("/%s/%s", _rootPath, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);
+
+    boolean success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL);
+    Assert.assertFalse(success);
+    long ttl = 1L;
+    success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
+    Assert.assertTrue(success);
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
+
+    record.setSimpleField("key0", "value0");
+    success = accessor.create(path, record, AccessOption.PERSISTENT_WITH_TTL, ttl);
+    Assert.assertFalse(success, "Should fail since node already exists");
+    getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getSimpleFields().size(), 0);
+
+    System.clearProperty("zookeeper.extendedTypesEnabled");
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testSyncCreateContainer() {
+    System.setProperty("zookeeper.extendedTypesEnabled", "true");
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String testName = className + "_" + methodName;
+
+    System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+    String path = String.format("/%s/%s", _rootPath, "msg_0");
+    ZNRecord record = new ZNRecord("msg_0");
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<>(_gZkClient);
+
+    boolean success = accessor.create(path, record, AccessOption.CONTAINER);
+    Assert.assertTrue(success);
+    ZNRecord getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getId(), "msg_0");
+
+    record.setSimpleField("key0", "value0");
+    success = accessor.create(path, record, AccessOption.CONTAINER);
+    Assert.assertFalse(success, "Should fail since node already exists");
+    getRecord = _gZkClient.readData(path);
+    Assert.assertNotNull(getRecord);
+    Assert.assertEquals(getRecord.getSimpleFields().size(), 0);
+
+    System.clearProperty("zookeeper.extendedTypesEnabled");
+    System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+  }
+
   @Test
   public void testDefaultAccessorCreateCustomData() {
     String className = TestHelper.getTestClassName();
@@ -513,6 +576,52 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase {
       Assert.assertEquals(record.getId(), msgId, "Should get what we created");
     }
 
+    // test async createChildren with TTL
+    System.setProperty("zookeeper.extendedTypesEnabled", "true");
+    records = new ArrayList<>();
+    paths = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathBuilder.instanceMessage(root, "host_2", msgId));
+      records.add(new ZNRecord(msgId));
+    }
+    success = accessor.createChildren(paths, records, AccessOption.PERSISTENT_WITH_TTL, 1L);
+    for (int i = 0; i < 10; i++) {
+      String msgId = "msg_" + i;
+      Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+    }
+
+    // test get what we created
+    for (int i = 0; i < 10; i++) {
+      String msgId = "msg_" + i;
+      String path = PropertyPathBuilder.instanceMessage(root, "host_2", msgId);
+      ZNRecord record = _gZkClient.readData(path);
+      Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+    }
+
+    // test async createChildren with Container mode
+    records = new ArrayList<>();
+    paths = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      String msgId = "msg_" + i;
+      paths.add(PropertyPathBuilder.instanceMessage(root, "host_3", msgId));
+      records.add(new ZNRecord(msgId));
+    }
+    success = accessor.createChildren(paths, records, AccessOption.CONTAINER);
+    for (int i = 0; i < 10; i++) {
+      String msgId = "msg_" + i;
+      Assert.assertTrue(success[i], "Should succeed in create " + msgId);
+    }
+
+    // test get what we created
+    for (int i = 0; i < 10; i++) {
+      String msgId = "msg_" + i;
+      String path = PropertyPathBuilder.instanceMessage(root, "host_3", msgId);
+      ZNRecord record = _gZkClient.readData(path);
+      Assert.assertEquals(record.getId(), msgId, "Should get what we created");
+    }
+    System.clearProperty("zookeeper.extendedTypesEnabled");
+
     // test async setChildren
     records = new ArrayList<>();
     paths = new ArrayList<>();
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index e22fcc2d3..1567b98cc 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -67,6 +67,11 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
     return set(path, record, options);
   }
 
+  @Override
+  public boolean create(String path, ZNRecord record, int options, long ttl) {
+    return set(path, record, options);
+  }
+
   @Override
   public boolean set(String path, ZNRecord record, int options) {
     ZNode zNode = _recordMap.get(path);
@@ -112,6 +117,12 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
     return setChildren(paths, records, options);
   }
 
+  @Override
+  public boolean[] createChildren(List<String> paths, List<ZNRecord> records,
+      int options, long ttl) {
+    return setChildren(paths, records, options);
+  }
+
   @Override
   public boolean[] setChildren(List<String> paths, List<ZNRecord> records, int options) {
     boolean [] ret = new boolean[paths.size()];