You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:49:05 UTC

[14/50] [abbrv] helix git commit: Support new API for getChildren with retry logic.

Support new API for getChildren with retry logic.

Current getChildren will remove the znode from list if it has not been read. It could return partial result of application expected. It will be a problem for applcations needs completed data.

New API will support retry logic. If it failed to read all the data from the ZK in retry count. Helix will throw an exception.

TODO: Helix will change the old API's behaivor when Helix start migrating APIs.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/79c96ba6
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/79c96ba6
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/79c96ba6

Branch: refs/heads/master
Commit: 79c96ba6b75c240c601e42ca7031ad1f855e1bc9
Parents: d1cbfdb
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Nov 9 15:05:04 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Jan 24 18:31:21 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/BaseDataAccessor.java | 21 +++++++
 .../helix/manager/zk/ZkBaseDataAccessor.java    | 63 ++++++++++++++++----
 .../manager/zk/ZkCacheBaseDataAccessor.java     | 11 +++-
 .../apache/helix/mock/MockBaseDataAccessor.java |  7 +++
 4 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/79c96ba6/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
index a8f2907..c42c700 100644
--- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
@@ -142,6 +142,11 @@ public interface BaseDataAccessor<T> {
 
   /**
    * Get the children under a parent path using async api
+   *
+   * For this API, if some of child node is failed to read, Helix will return the data of read
+   * nodes. So user may get partial data. No exception will be thrown even if it is failed to read
+   * all the data.
+   *
    * @param parentPath path to the immediate parent ZNode
    * @param stats Zookeeper Stat objects corresponding to each child
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}
@@ -150,6 +155,22 @@ public interface BaseDataAccessor<T> {
   List<T> getChildren(String parentPath, List<Stat> stats, int options);
 
   /**
+   * Get the children under a parent path using async api
+   *
+   * If some of child node is failed to read, Helix will do the retry within retry count. If the
+   * result still cannot be retrieved completely, Helix will throw an HelixException.
+   *
+   * @param parentPath path to the immediate parent ZNode
+   * @param stats Zookeeper Stat objects corresponding to each child
+   * @param options Set the type of ZNode see the valid values in {@link AccessOption}
+   * @param retryCount The number of retries that data is not completed read
+   * @param retryInterval The interval between two retries
+   * @return A list of children of the parent ZNode
+   */
+  List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount,
+      int retryInterval) throws HelixException;
+
+  /**
    * Returns the child names given a parent path
    * @param parentPath path to the immediate parent ZNode
    * @param options Set the type of ZNode see the valid values in {@link AccessOption}

http://git-wip-us.apache.org/repos/asf/helix/blob/79c96ba6/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 842f0c0..0ccda43 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -34,6 +33,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
@@ -42,12 +42,12 @@ import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.HelixUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   enum RetCode {
@@ -331,13 +331,13 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     boolean[] needRead = new boolean[paths.size()];
     Arrays.fill(needRead, true);
 
-    return get(paths, stats, needRead);
+    return get(paths, stats, needRead, false);
   }
 
   /**
    * async get
    */
-  List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead) {
+  List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead, boolean throwException) {
     if (paths == null || paths.size() == 0) {
       return Collections.emptyList();
     }
@@ -373,7 +373,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
       // construct return results
       List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
-
+      StringBuilder nodeFailToRead = new StringBuilder();
       for (int i = 0; i < paths.size(); i++) {
         if (!needRead[i])
           continue;
@@ -386,9 +386,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
           if (stats != null) {
             stats.set(i, cb._stat);
           }
+        } else if (Code.get(cb.getRc()) != Code.NONODE && throwException) {
+          throw new HelixException(String.format("Failed to read node %s", paths.get(i)));
+        } else {
+          nodeFailToRead.append(paths + ",");
         }
       }
-
+      LOG.warn(String.format("Fail to read nodes for paths : %s",
+          nodeFailToRead.toString().substring(nodeFailToRead.length() - 1)));
       return records;
     } finally {
       long endT = System.nanoTime();
@@ -401,9 +406,42 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
   /**
    * asyn getChildren
+   * The retryCount and retryInterval will be ignored.
    */
+  // TODO: Change the behavior of getChildren when Helix starts migrating API.
   @Override
   public List<T> getChildren(String parentPath, List<Stat> stats, int options) {
+    return getChildren(parentPath, stats, options, false);
+  }
+
+
+  @Override
+  public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount,
+      int retryInterval) throws HelixException {
+    int readCount = retryCount + 1;
+    while (readCount > 0) {
+      try {
+        readCount--;
+        List<T> records = getChildren(parentPath, stats, options, true);
+        return records;
+      } catch (HelixException e) {
+        if (readCount == 0) {
+          throw new HelixException(String.format("Failed to get full list of %s", parentPath), e);
+        }
+        try {
+          Thread.sleep(retryInterval);
+        } catch (InterruptedException interruptedException) {
+          throw new HelixException("Fail to interrupt the sleep", interruptedException);
+        }
+      }
+    }
+
+    // Impossible to reach end
+    return null;
+  }
+
+  private List<T> getChildren(String parentPath, List<Stat> stats, int options,
+      boolean throwException) {
     try {
       // prepare child paths
       List<String> childNames = getChildNames(parentPath, options);
@@ -411,15 +449,17 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         return Collections.emptyList();
       }
 
-      List<String> paths = new ArrayList<String>();
+      List<String> paths = new ArrayList<>();
       for (String childName : childNames) {
         String path = parentPath + "/" + childName;
         paths.add(path);
       }
 
       // remove null record
-      List<Stat> curStats = new ArrayList<Stat>(paths.size());
-      List<T> records = get(paths, curStats, options);
+      List<Stat> curStats = new ArrayList<>(paths.size());
+      boolean[] needRead = new boolean[paths.size()];
+      Arrays.fill(needRead, true);
+      List<T> records = get(paths, curStats, needRead, throwException);
       Iterator<T> recordIter = records.iterator();
       Iterator<Stat> statIter = curStats.iterator();
       while (statIter.hasNext()) {
@@ -806,7 +846,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
         // asycn read all data
         List<Stat> curStats = new ArrayList<Stat>();
-        List<T> curDataList = get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
+        List<T> curDataList =
+            get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length), false);
 
         // async update
         List<T> newDataList = new ArrayList<T>();

http://git-wip-us.apache.org/repos/asf/helix/blob/79c96ba6/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 748e090..18abe91 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
@@ -600,7 +601,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       if (needRead) {
         cache.lockWrite();
         try {
-          List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
+          List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads, false);
           for (int i = 0; i < size; i++) {
             if (needReads[i]) {
               records.set(i, readRecords.get(i));
@@ -672,7 +673,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       return null;
     }
 
-    List<String> paths = new ArrayList<String>();
+    List<String> paths = new ArrayList<>();
     for (String childName : childNames) {
       String path = parentPath.equals("/") ? "/" + childName : parentPath + "/" + childName;
       paths.add(path);
@@ -682,6 +683,12 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
   }
 
   @Override
+  public List<T> getChildren(String parentPath, List<Stat> stats, int options,
+      int retryCount, int retryInterval) throws HelixException {
+    return getChildren(parentPath, stats, options);
+  }
+
+  @Override
   public void subscribeDataChanges(String path, IZkDataListener listener) {
     String serverPath = prependChroot(path);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/79c96ba6/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
index 77da401..099b3fc 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java
@@ -29,6 +29,7 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import org.apache.zookeeper.data.Stat;
 
@@ -181,6 +182,12 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> {
   }
 
   @Override
+  public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int options,
+      int retryCount, int retryInterval) throws HelixException {
+    return getChildren(parentPath, stats, options);
+  }
+
+  @Override
   public List<String> getChildNames(String parentPath, int options) {
     List<String> child = new ArrayList<>();
     for (String key : _recordMap.keySet()) {