You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hz...@apache.org on 2020/12/18 03:53:12 UTC

[helix] branch master updated: Leverage zk paginated getChildren API for ZkClient to fetch a large number of children (#1526)

This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 836962d  Leverage zk paginated getChildren API for ZkClient to fetch a large number of children (#1526)
836962d is described below

commit 836962ddad4f1702775a1f001e6e554520543195
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Thu Dec 17 19:53:02 2020 -0800

    Leverage zk paginated getChildren API for ZkClient to fetch a large number of children (#1526)
    
    If a node has many children and the packet length exceeds the jute.maxbuffer, the ZkClient's getChildren operation will fail. There are some cases that users will expect to fetch all the children successfully. Pagination could be used to support this feature. It requires the native zk to have the paginated getChildren API.
    
    This uses reflection to determine whether or not getChildren should use the native zk pagination api.
---
 zookeeper-api/pom.xml                              |   5 +
 .../zookeeper/constant/ZkSystemPropertyKeys.java   |  17 ++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  14 +++
 .../helix/zookeeper/zkclient/ZkConnection.java     | 104 ++++++++++++++++++++-
 .../helix/zookeeper/zkclient/TestZkConnection.java | 102 ++++++++++++++++++++
 5 files changed, 237 insertions(+), 5 deletions(-)

diff --git a/zookeeper-api/pom.xml b/zookeeper-api/pom.xml
index ed37e45..5ee5296 100644
--- a/zookeeper-api/pom.xml
+++ b/zookeeper-api/pom.xml
@@ -109,6 +109,11 @@
       <version>2.6</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <resources>
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
index d4495c6..1121554 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
@@ -63,4 +63,21 @@ public class ZkSystemPropertyKeys {
 
   /** System property key for jute.maxbuffer */
   public static final String JUTE_MAXBUFFER = "jute.maxbuffer";
+
+  /**
+   * Setting this property to {@code true} in system properties will force Helix ZkClient to use
+   * the <b>non-paginated</b> {@code getChildren} API, no matter if zookeeper supports pagination
+   * or not.
+   * <p>
+   * Given both the zookeeper client and server support <b>paginated</b> {@code getChildren} API as
+   * a prerequisite, if set to {@code false}, it will enable Helix ZkClient's {@code getChildren}
+   * API to call zookeeper's <b>paginated</b> {@code getChildren} API.
+   * <p>
+   * The default value is {@code false}.
+   * <p>
+   * Note: be cautious to use this config as it can be deprecated soon.
+   */
+  // TODO: deprecate this config after paginated API is deployed and stable
+  public static final String ZK_GETCHILDREN_PAGINATION_DISABLED =
+      "zk.getChildren.pagination.disabled";
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index b5b86c0..7cef694 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1012,6 +1012,20 @@ public class ZkClient implements Watcher {
     }
   }
 
+  /**
+   * Returns a list of children of the given path.
+   * <p>
+   * NOTE: if the given path has too many children which causes the network packet length to exceed
+   * {@code jute.maxbuffer}, there are 2 cases, depending on whether or not the native
+   * zk supports paginated getChildren API and the config
+   * {@link ZkSystemPropertyKeys#ZK_GETCHILDREN_PAGINATION_DISABLED}:
+   * <p>1) pagination is disabled by {@link ZkSystemPropertyKeys#ZK_GETCHILDREN_PAGINATION_DISABLED}
+   * set to true or zk does not support pagination: the operation will fail.
+   * <p>2) config is false and zk supports pagination. A list of all children will be fetched using
+   * pagination and returned. But please note that the final children list is NOT strongly
+   * consistent with server - the list might contain some deleted children if some children
+   * are deleted before the last page is fetched. The upstream caller should be able to handle this.
+   */
   public List<String> getChildren(String path) {
     return getChildren(path, hasListeners(path));
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 3c426a5..08a2fb9 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -20,12 +20,15 @@ package org.apache.helix.zookeeper.zkclient;
  */
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.List;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
-import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -36,16 +39,24 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZkConnection implements IZkConnection {
-  private static final Logger LOG = Logger.getLogger(ZkConnection.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZkConnection.class);
 
   /** It is recommended to use quite large sessions timeouts for ZooKeeper. */
   private static final int DEFAULT_SESSION_TIMEOUT = 30000;
 
-  private ZooKeeper _zk = null;
+  // A config to force disabling using ZK's paginated getChildren.
+  // By default the value is false.
+  private static final boolean GETCHILDREN_PAGINATION_DISABLED =
+      Boolean.getBoolean(ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED);
+
+  @VisibleForTesting
+  protected ZooKeeper _zk = null;
   private Lock _zookeeperLock = new ReentrantLock();
+  private Method _getChildrenMethod;
 
   private final String _servers;
   private final int _sessionTimeOut;
@@ -132,10 +143,44 @@ public class ZkConnection implements IZkConnection {
     return _zk.exists(path, watch) != null;
   }
 
+  /**
+   * Returns a list of children of the given path.
+   * <p>
+   * If the watch is non-null and the call is successful (no exception is thrown),
+   * a watch will be left on the node with the given path.
+   * <p>
+   * The implementation uses java reflection to check whether the native zk supports
+   * paginated getChildren API:
+   * <p>- if yes, and {@link #GETCHILDREN_PAGINATION_DISABLED} is false, call the paginated API;
+   * <p>- otherwise, fall back to the non-paginated API.
+   *
+   * @param path the path of the node
+   * @param watch a boolean flag to indicate whether the watch should be added to the node
+   * @return a list of children of the given path
+   * @throws KeeperException if the server signals an error with a non-zero error code
+   * @throws InterruptedException if the server transaction is interrupted
+   */
   @Override
   public List<String> getChildren(final String path, final boolean watch)
       throws KeeperException, InterruptedException {
-    return _zk.getChildren(path, watch);
+    if (_getChildrenMethod == null) {
+      lookupGetChildrenMethod();
+    }
+
+    try {
+      // This cast is safe because the type passed in is also List<String>
+      @SuppressWarnings("unchecked")
+      List<String> children = (List<String>) _getChildrenMethod.invoke(_zk, path, watch);
+      return children;
+    } catch (InvocationTargetException e) {
+      // Handle any exceptions thrown by method to be invoked
+      handleInvokedMethodException(e.getCause());
+    } catch (IllegalAccessException e) {
+      // Log the exception to understand the detailed reason.
+      LOG.error("Unable to get children for {}", path, e);
+    }
+    // If it reaches here, something must be wrong with the API.
+    throw KeeperException.create(KeeperException.Code.APIERROR, path);
   }
 
   @Override
@@ -192,4 +237,53 @@ public class ZkConnection implements IZkConnection {
   public void addAuthInfo(String scheme, byte[] auth) {
     _zk.addAuthInfo(scheme, auth);
   }
+
+  private void lookupGetChildrenMethod() {
+    _getChildrenMethod = doLookUpGetChildrenMethod();
+
+    LOG.info("Pagination config {}={}, method to be invoked: {}",
+        ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, GETCHILDREN_PAGINATION_DISABLED,
+        _getChildrenMethod.getName());
+  }
+
+  private Method doLookUpGetChildrenMethod() {
+    if (!GETCHILDREN_PAGINATION_DISABLED) {
+      try {
+        // Lookup the paginated getChildren API
+        return ZooKeeper.class.getMethod("getAllChildrenPaginated", String.class, boolean.class);
+      } catch (NoSuchMethodException e) {
+        LOG.info("Paginated getChildren is not supported, fall back to non-paginated getChildren");
+      }
+    }
+
+    return lookupNonPaginatedGetChildren();
+  }
+
+  private Method lookupNonPaginatedGetChildren() {
+    try {
+      return ZooKeeper.class.getMethod("getChildren", String.class, boolean.class);
+    } catch (NoSuchMethodException e) {
+      // We should not expect this exception here.
+      throw ExceptionUtil.convertToRuntimeException(e.getCause());
+    }
+  }
+
+  private void handleInvokedMethodException(Throwable cause)
+      throws KeeperException, InterruptedException {
+    if (cause instanceof KeeperException.UnimplementedException) {
+      LOG.warn("Paginated getChildren is unimplemented in ZK server! "
+          + "Falling back to non-paginated getChildren");
+      _getChildrenMethod = lookupNonPaginatedGetChildren();
+      // ZK server would disconnect this connection because of UnimplementedException.
+      // Throw CONNECTIONLOSS so ZkClient can retry.
+      // TODO: handle it in a better way without throwing CONNECTIONLOSS
+      throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
+    } else if (cause instanceof KeeperException) {
+      throw KeeperException.create(((KeeperException) cause).code());
+    } else if (cause instanceof InterruptedException) {
+      throw new InterruptedException(cause.getMessage());
+    } else {
+      throw ExceptionUtil.convertToRuntimeException(cause);
+    }
+  }
 }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkConnection.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkConnection.java
new file mode 100644
index 0000000..ebdd1c1
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkConnection.java
@@ -0,0 +1,102 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestZkConnection {
+  @Test
+  public void testGetChildren() throws KeeperException, InterruptedException {
+    MockZkConnection zkConnection = new MockZkConnection("dummy.server");
+    ZooKeeper zk = mock(ZooKeeper.class);
+    zkConnection.setZookeeper(zk);
+
+    List<String> children = Arrays.asList("1", "2", "3");
+    when(zk.getChildren(anyString(), anyBoolean())).thenReturn(children);
+
+    List<String> actualChildren = zkConnection.getChildren("/path", false);
+
+    Assert.assertEquals(actualChildren, children);
+  }
+
+  @Test
+  public void testGetChildrenException() throws KeeperException, InterruptedException {
+    MockZkConnection zkConnection = new MockZkConnection("dummy.server");
+    ZooKeeper zk = mock(ZooKeeper.class);
+    zkConnection.setZookeeper(zk);
+
+    when(zk.getChildren(anyString(), anyBoolean()))
+        .thenThrow(KeeperException.create(KeeperException.Code.CONNECTIONLOSS))
+        .thenThrow(KeeperException.create(KeeperException.Code.UNIMPLEMENTED))
+        .thenThrow(new InterruptedException())
+        .thenThrow(new IllegalStateException());
+
+    try {
+      zkConnection.getChildren("/path", false);
+      Assert.fail("Should have thrown exception.");
+    } catch (KeeperException.ConnectionLossException expected) {
+      // Expected exception
+    }
+
+    try {
+      zkConnection.getChildren("/path", false);
+      Assert.fail("Should have thrown exception.");
+    } catch (KeeperException.ConnectionLossException expected) {
+      // Expected exception
+    }
+
+    try {
+      zkConnection.getChildren("/path", false);
+      Assert.fail("Should have thrown exception.");
+    } catch (InterruptedException expected) {
+      // Expected exception
+    }
+
+    try {
+      zkConnection.getChildren("/path", false);
+      Assert.fail("Should have thrown exception.");
+    } catch (RuntimeException expected) {
+      // Expected exception
+    }
+  }
+
+  private static class MockZkConnection extends ZkConnection {
+
+    public MockZkConnection(String zkServers) {
+      super(zkServers);
+    }
+
+    public void setZookeeper(ZooKeeper zk) {
+      _zk = zk;
+    }
+  }
+}