You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/11/16 23:29:04 UTC
[helix] branch helix-0.9.x updated: Backport: Validate data write size limit in ZkClient #1072
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch helix-0.9.x
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-0.9.x by this push:
new 982c358 Backport: Validate data write size limit in ZkClient #1072
982c358 is described below
commit 982c35867893985768ac4a6de86e1e64b5095e1f
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Tue Nov 16 15:28:43 2021 -0800
Backport: Validate data write size limit in ZkClient #1072
---
.../java/org/apache/helix/SystemPropertyKeys.java | 4 ++
.../helix/manager/zk/zookeeper/ZkClient.java | 40 +++++++++++++----
.../java/org/apache/helix/util/ZNRecordUtil.java | 2 +-
.../src/test/java/org/apache/helix/TestHelper.java | 8 ++++
.../apache/helix/manager/zk/TestRawZkClient.java | 52 +++++++++++++++++++---
5 files changed, 91 insertions(+), 15 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
index e2ce8e0..1288484 100644
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -64,6 +64,10 @@ public class SystemPropertyKeys {
// Indicate monitoring level of the HelixManager metrics
public static final String MONITOR_LEVEL = "helixmanager.monitorLevel";
+
+ /** System property key for jute.maxbuffer */
+ public static final String JUTE_MAXBUFFER = "jute.maxbuffer";
+
// CallbackHandler
public static final String ASYNC_BATCH_MODE_ENABLED = "helix.callbackhandler.isAsyncBatchModeEnabled";
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 6f7c8ef..5196357 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -40,6 +40,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.manager.zk.BasicZkSerializer;
@@ -48,6 +49,7 @@ import org.apache.helix.manager.zk.ZkAsyncCallbacks;
import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
import org.apache.helix.util.ExponentialBackoffStrategy;
+import org.apache.helix.util.ZNRecordUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -81,6 +83,11 @@ public class ZkClient implements Watcher {
// TODO: remove it once we have a better way to exit retry for this case
private static final int NUM_CHILDREN_LIMIT = 100 * 1000;
+ // ZNode write size limit in bytes.
+ // TODO: use ZKConfig#JUTE_MAXBUFFER once bumping up ZK to 3.5.2+
+ private static final int WRITE_SIZE_LIMIT =
+ Integer.getInteger(SystemPropertyKeys.JUTE_MAXBUFFER, ZNRecord.SIZE_LIMIT);
+
private final IZkConnection _connection;
private final long _operationRetryTimeoutInMillis;
private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>();
@@ -179,6 +186,9 @@ public class ZkClient implements Watcher {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
+
+ validateWriteSizeLimitConfig();
+
_connection = zkConnection;
_pathBasedZkSerializer = zkSerializer;
_operationRetryTimeoutInMillis = operationRetryTimeout;
@@ -538,7 +548,7 @@ public class ZkClient implements Watcher {
long startT = System.currentTimeMillis();
try {
final byte[] data = datat == null ? null : serialize(datat, path);
- checkDataSizeLimit(data);
+ checkDataSizeLimit(path, data);
String actualPath = retryUntilConnected(new Callable<String>() {
@Override
public String call() throws Exception {
@@ -1403,7 +1413,7 @@ public class ZkClient implements Watcher {
long startT = System.currentTimeMillis();
try {
final byte[] data = serialize(datat, path);
- checkDataSizeLimit(data);
+ checkDataSizeLimit(path, data);
final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
@Override
public Object call() throws Exception {
@@ -1506,11 +1516,15 @@ public class ZkClient implements Watcher {
});
}
- private void checkDataSizeLimit(byte[] data) {
- if (data != null && data.length > ZNRecord.SIZE_LIMIT) {
- LOG.error("Data size larger than 1M, will not write to zk. Data (first 1k): "
- + new String(data).substring(0, 1024));
- throw new HelixException("Data size larger than 1M");
+ private void checkDataSizeLimit(String path, byte[] data) {
+ if (data == null) {
+ return;
+ }
+
+ if (data.length > WRITE_SIZE_LIMIT) {
+ throw new HelixException("Data size of path " + path
+ + " is greater than write size limit "
+ + WRITE_SIZE_LIMIT + " bytes");
}
}
@@ -1803,7 +1817,7 @@ public class ZkClient implements Watcher {
if (stat.getNumChildren() > NUM_CHILDREN_LIMIT) {
LOG.error("Failed to get children for path {} because of connection loss. "
- + "Number of children {} exceeds limit {}, aborting retry.", path,
+ + "Number of children {} exceeds limit {}, aborting retry.", path, stat.getNumChildren(),
stat.getNumChildren(),
NUM_CHILDREN_LIMIT);
// MarshallingErrorException could represent transport error: exceeding the
@@ -1815,4 +1829,14 @@ public class ZkClient implements Watcher {
stat.getNumChildren(), NUM_CHILDREN_LIMIT);
}
}
+
+ private void validateWriteSizeLimitConfig() {
+ int serializerSize = ZNRecordUtil.getSerializerWriteSizeLimit();
+ LOG.info("ZNRecord serializer write size limit: {}; ZkClient write size limit: {}",
+ serializerSize, WRITE_SIZE_LIMIT);
+ if (serializerSize > WRITE_SIZE_LIMIT) {
+ throw new IllegalStateException("ZNRecord serializer write size limit " + serializerSize
+ + " is greater than ZkClient size limit " + WRITE_SIZE_LIMIT);
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
index c8f8e81..324e86f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZNRecordUtil.java
@@ -148,7 +148,7 @@ public final class ZNRecordUtil {
/**
* Returns ZNRecord serializer write size limit in bytes. If size limit is configured to be less
- * than or equal to 0, the default value will be used instead.
+ * than or equal to 0, the default value {@link ZNRecord#SIZE_LIMIT} will be used instead.
*/
public static int getSerializerWriteSizeLimit() {
Integer writeSizeLimit =
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index fa93a72..e54e731 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -776,6 +776,14 @@ public class TestHelper {
return sb.toString();
}
+ public static void resetSystemProperty(String key, String originValue) {
+ if (originValue == null) {
+ System.clearProperty(key);
+ } else {
+ System.setProperty(key, originValue);
+ }
+ }
+
public static interface Verifier {
boolean verify() throws Exception;
}
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index 1cc2f2f..8f2c4cf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -23,6 +23,7 @@ import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkServer;
import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.HelixException;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
@@ -403,12 +404,9 @@ public class TestRawZkClient extends ZkUnitTestBase {
Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError);
Assert.assertEquals(zkClient.readData("/tmp/async"), normalZNRecord);
} finally {
- if (originSizeLimit == null) {
- System.clearProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
- } else {
- System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
- originSizeLimit);
- }
+ TestHelper
+ .resetSystemProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ originSizeLimit);
zkClient.delete("/tmp/async");
zkClient.delete("/tmp/asyncOversize");
}
@@ -462,4 +460,46 @@ public class TestRawZkClient extends ZkUnitTestBase {
}
System.out.println("End test: " + methodName);
}
+
+ @Test(expectedExceptions = HelixException.class,
+ expectedExceptionsMessageRegExp = "Data size of path .* is greater than write size limit 1024000 bytes")
+ public void testDataSizeGreaterThanLimit() {
+ // Creating should fail because size is greater than limit.
+ _zkClient.createPersistent("/" + TestHelper.getTestMethodName(), new byte[1001 * 1024]);
+ }
+
+ @Test
+ public void testDataSizeLessThanLimit() throws Exception {
+ String path = "/" + TestHelper.getTestMethodName();
+ Assert.assertFalse(_zkClient.exists(path));
+ // Creating znode is successful.
+ _zkClient.createPersistent(path, new byte[1024]);
+
+ Assert.assertTrue(_zkClient.exists(path));
+
+ TestHelper.verify(() -> _zkClient.delete(path), TestHelper.WAIT_DURATION);
+ }
+
+ // Tests znrecord serializer write size limit is invalid: greater than size limit in ZkClient
+ @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp =
+ "ZNRecord serializer write size limit .* is greater than ZkClient size limit .*")
+ public void testInvalidWriteSizeLimitConfig() {
+ String originSerializerLimit =
+ System.getProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+ System.setProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ String.valueOf(ZNRecord.SIZE_LIMIT + 1024));
+
+ ZkClient zkClient = null;
+ try {
+ // Constructing ZkClient should throw exception because of invalid write size limit config
+ zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
+ } finally {
+ TestHelper
+ .resetSystemProperty(SystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
+ originSerializerLimit);
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ }
}