You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/11/16 23:29:04 UTC

[helix] branch helix-0.9.x updated: Backport: Validate data write size limit in ZkClient #1072

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-0.9.x by this push:
     new 982c358  Backport: Validate data write size limit in ZkClient #1072
982c358 is described below

commit 982c35867893985768ac4a6de86e1e64b5095e1f
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Tue Nov 16 15:28:43 2021 -0800

    Backport: Validate data write size limit in ZkClient #1072
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |  4 ++
 .../helix/manager/zk/zookeeper/ZkClient.java       | 40 +++++++++++++----
 .../java/org/apache/helix/util/ZNRecordUtil.java   |  2 +-
 .../src/test/java/org/apache/helix/TestHelper.java |  8 ++++
 .../apache/helix/manager/zk/TestRawZkClient.java   | 52 +++++++++++++++++++---
 5 files changed, 91 insertions(+), 15 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index e2ce8e0..1288484 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -64,6 +64,10 @@ public class SystemPropertyKeys {
   // Indicate monitoring level of the HelixManager metrics
   public static final String MONITOR_LEVEL = "helixmanager.monitorLevel";
 
+
+  /** System property key for jute.maxbuffer */
+  public static final String JUTE_MAXBUFFER = "jute.maxbuffer";
+
   // CallbackHandler
   public static final String ASYNC_BATCH_MODE_ENABLED = "helix.callbackhandler.isAsyncBatchModeEnabled";
 
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 6f7c8ef..5196357 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -40,6 +40,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.I0Itec.zkclient.exception.ZkTimeoutException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.BasicZkSerializer;
@@ -48,6 +49,7 @@ import org.apache.helix.manager.zk.ZkAsyncCallbacks;
 import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.util.ExponentialBackoffStrategy;
+import org.apache.helix.util.ZNRecordUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -81,6 +83,11 @@ public class ZkClient implements Watcher {
   // TODO: remove it once we have a better way to exit retry for this case
   private static final int NUM_CHILDREN_LIMIT = 100 * 1000;
 
+  // ZNode write size limit in bytes.
+  // TODO: use ZKConfig#JUTE_MAXBUFFER once bumping up ZK to 3.5.2+
+  private static final int WRITE_SIZE_LIMIT =
+      Integer.getInteger(SystemPropertyKeys.JUTE_MAXBUFFER, ZNRecord.SIZE_LIMIT);
+
   private final IZkConnection _connection;
   private final long _operationRetryTimeoutInMillis;
   private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>();
@@ -179,6 +186,9 @@ public class ZkClient implements Watcher {
     if (zkConnection == null) {
       throw new NullPointerException("Zookeeper connection is null!");
     }
+
+    validateWriteSizeLimitConfig();
+
     _connection = zkConnection;
     _pathBasedZkSerializer = zkSerializer;
     _operationRetryTimeoutInMillis = operationRetryTimeout;
@@ -538,7 +548,7 @@ public class ZkClient implements Watcher {
     long startT = System.currentTimeMillis();
     try {
       final byte[] data = datat == null ? null : serialize(datat, path);
-      checkDataSizeLimit(data);
+      checkDataSizeLimit(path, data);
       String actualPath = retryUntilConnected(new Callable<String>() {
         @Override
         public String call() throws Exception {
@@ -1403,7 +1413,7 @@ public class ZkClient implements Watcher {
     long startT = System.currentTimeMillis();
     try {
       final byte[] data = serialize(datat, path);
-      checkDataSizeLimit(data);
+      checkDataSizeLimit(path, data);
       final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
         @Override
         public Object call() throws Exception {
@@ -1506,11 +1516,15 @@ public class ZkClient implements Watcher {
     });
   }
 
-  private void checkDataSizeLimit(byte[] data) {
-    if (data != null && data.length > ZNRecord.SIZE_LIMIT) {
-      LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): "
-          + new String(data).substring(0, 1024));
-      throw new HelixException("Data size larger than 1M");
+  private void checkDataSizeLimit(String path, byte[] data) {
+    if (data == null) {
+      return;
+    }
+
+    if (data.length > WRITE_SIZE_LIMIT) {
+      throw new HelixException("Data size of path " + path
+          + " is greater than write size limit "
+          + WRITE_SIZE_LIMIT + " bytes");
     }
   }
 
@@ -1803,7 +1817,7 @@ public class ZkClient implements Watcher {
 
     if (stat.getNumChildren() > NUM_CHILDREN_LIMIT) {
       LOG.error("Failed to get children for path {} because of connection loss. "
-              + "Number of children {} exceeds limit {}, aborting retry.", path,
+              + "Number of children {} exceeds limit {}, aborting retry.", path, stat.getNumChildren(),
           stat.getNumChildren(),
           NUM_CHILDREN_LIMIT);
       // MarshallingErrorException could represent transport error: exceeding the
@@ -1815,4 +1829,14 @@ public class ZkClient implements Watcher {
           stat.getNumChildren(), NUM_CHILDREN_LIMIT);
     }
   }
+
+  private void validateWriteSizeLimitConfig() {
+    int serializerSize = ZNRecordUtil.getSerializerWriteSizeLimit();
+    LOG.info("ZNRecord serializer write size limit: {}; ZkClient write size limit: {}",
+        serializerSize, WRITE_SIZE_LIMIT);
+    if (serializerSize > WRITE_SIZE_LIMIT) {
+      throw new IllegalStateException("ZNRecord serializer write size limit " + serializerSize
+          + " is greater than ZkClient size limit " + WRITE_SIZE_LIMIT);
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
index c8f8e81..324e86f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
@@ -148,7 +148,7 @@ public final class ZNRecordUtil {
 
   /**
    * Returns ZNRecord serializer write size limit in bytes. If size limit is configured to be less
-   * than or equal to 0, the default value will be used instead.
+   * than or equal to 0, the default value {@link ZNRecord#SIZE_LIMIT} will be used instead.
    */
   public static int getSerializerWriteSizeLimit() {
     Integer writeSizeLimit =
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index fa93a72..e54e731 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -776,6 +776,14 @@ public class TestHelper {
     return sb.toString();
   }
 
+  public static void resetSystemProperty(String key, String originValue) {
+    if (originValue == null) {
+      System.clearProperty(key);
+    } else {
+      System.setProperty(key, originValue);
+    }
+  }
+
   public static interface Verifier {
     boolean verify() throws Exception;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index 1cc2f2f..8f2c4cf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
 import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.HelixException;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
@@ -403,12 +404,9 @@ public class TestRawZkClient extends ZkUnitTestBase {
       Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError);
       Assert.assertEquals(zkClient.readData("/tmp/async"), normalZNRecord);
     } finally {
-      if (originSizeLimit == null) {
-        System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
-      } else {
-        System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-            originSizeLimit);
-      }
+      TestHelper
+          .resetSystemProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+              originSizeLimit);
       zkClient.delete("/tmp/async");
       zkClient.delete("/tmp/asyncOversize");
     }
@@ -462,4 +460,46 @@ public class TestRawZkClient extends ZkUnitTestBase {
     }
     System.out.println("End test: " + methodName);
   }
+
+  @Test(expectedExceptions = HelixException.class,
+      expectedExceptionsMessageRegExp = "Data size of path .* is greater than write size limit 1024000 bytes")
+  public void testDataSizeGreaterThanLimit() {
+    // Creating should fail because size is greater than limit.
+    _zkClient.createPersistent("/" + TestHelper.getTestMethodName(), new byte[1001 * 1024]);
+  }
+
+  @Test
+  public void testDataSizeLessThanLimit() throws Exception {
+    String path = "/" + TestHelper.getTestMethodName();
+    Assert.assertFalse(_zkClient.exists(path));
+    // Creating znode is successful.
+    _zkClient.createPersistent(path, new byte[1024]);
+
+    Assert.assertTrue(_zkClient.exists(path));
+
+    TestHelper.verify(() -> _zkClient.delete(path), TestHelper.WAIT_DURATION);
+  }
+
+  // Tests znrecord serializer write size limit is invalid: greater than size limit in ZkClient
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp =
+      "ZNRecord serializer write size limit .* is greater than ZkClient size limit .*")
+  public void testInvalidWriteSizeLimitConfig() {
+    String originSerializerLimit =
+        System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+        String.valueOf(ZNRecord.SIZE_LIMIT + 1024));
+
+    ZkClient zkClient = null;
+    try {
+      // Constructing ZkClient should throw exception because of invalid write size limit config
+      zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
+    } finally {
+      TestHelper
+          .resetSystemProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+              originSerializerLimit);
+      if (zkClient != null) {
+        zkClient.close();
+      }
+    }
+  }
 }