You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/11/04 23:36:28 UTC

[helix] branch master updated: Add additional ZK serializer configuration to active ZNRecord compression even the node size is smaller than write size limit.

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c5e9fc  Add additional ZK serializer configuration to active ZNRecord compression even the node size is smaller than write size limit.
9c5e9fc is described below

commit 9c5e9fc9aafed8c65d8d93de4b25ed76d42201ea
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Thu Nov 4 11:52:05 2021 -0700

    Add additional ZK serializer configuration to active ZNRecord compression even the node size is smaller than write size limit.
    
    The property zk.serializer.znrecord.auto-compress.threshold.bytes defines a threshold of ZNRecord size in bytes that the ZK serializer starts to auto compress the ZNRecord for write requests if it's size exceeds the threshold.
    If the threshold is not configured or exceed ZKRecord write size limit, default value zk.serializer.znrecord.write.size.limit.bytes (if configured) or 1MB (if no configuration) will be applied.
---
 .../zookeeper/constant/ZkSystemPropertyKeys.java   |  9 +++
 .../apache/helix/zookeeper/util/ZNRecordUtil.java  | 20 +++++-
 .../TestZNRecordSerializeWriteSizeLimit.java       | 84 +++++++++++++---------
 3 files changed, 78 insertions(+), 35 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
index 1121554..1c6bf41 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
@@ -37,6 +37,15 @@ public class ZkSystemPropertyKeys {
       "zk.serializer.znrecord.auto-compress.enabled";
 
   /**
+   * This property defines a threshold of ZNRecord size in bytes that the ZK serializer starts to auto compress
+   * the ZNRecord for write requests if it's size exceeds the threshold.
+   * If the threshold is not configured or exceed ZKRecord write size limit, default value
+   * {@value ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES} will be applied.
+   */
+  public static final String ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES =
+      "zk.serializer.znrecord.auto-compress.threshold.bytes";
+
+  /**
    * This is property that defines the maximum write size in bytes for ZKRecord's two serializers
    * before serialized data is ready to be written to ZK. This property applies to
    * 1. {@link org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/ZNRecordUtil.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/ZNRecordUtil.java
index 90a568f..6b55c57 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/ZNRecordUtil.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/util/ZNRecordUtil.java
@@ -45,7 +45,25 @@ public class ZNRecordUtil {
         .getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
             ZNRecord.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_DEFAULT));
 
-    return autoCompressEnabled && serializedLength > getSerializerWriteSizeLimit();
+    return autoCompressEnabled && serializedLength > getSerializerCompressThreshold();
+  }
+
+  /**
+   * Returns the threshold in bytes that ZNRecord serializer should compress a ZNRecord with larger size.
+   * If threshold is configured to be less than or equal to 0, the serializer will always compress ZNRecords as long as
+   * auto-compression is enabled.
+   * If threshold is not configured or the threshold is larger than ZNRecord write size limit, the default value
+   * ZNRecord write size limit will be used instead.
+   */
+  private static int getSerializerCompressThreshold() {
+    Integer autoCompressThreshold =
+        Integer.getInteger(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+
+    if (autoCompressThreshold == null || autoCompressThreshold > getSerializerWriteSizeLimit()) {
+      return getSerializerWriteSizeLimit();
+    }
+
+    return autoCompressThreshold;
   }
 
   /**
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/datamodel/serializer/TestZNRecordSerializeWriteSizeLimit.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/datamodel/serializer/TestZNRecordSerializeWriteSizeLimit.java
index fc9fa77..cd03a43 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/datamodel/serializer/TestZNRecordSerializeWriteSizeLimit.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/datamodel/serializer/TestZNRecordSerializeWriteSizeLimit.java
@@ -63,10 +63,10 @@ public class TestZNRecordSerializeWriteSizeLimit {
       Assert.assertFalse(
           Boolean.getBoolean(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED));
       // Data size 300 KB > size limit 200 KB: exception expected.
-      verifyAutoCompression(300, writeSizeLimit, true, false, true);
+      verifyAutoCompression(300, false, true);
 
       // Data size 100 KB < size limit 200 KB: pass
-      verifyAutoCompression(100, writeSizeLimit, false, false, false);
+      verifyAutoCompression(100, false, false);
     } finally {
       // Reset: add the properties back to system properties if they were originally available.
       if (compressionEnabledProperty != null) {
@@ -85,46 +85,60 @@ public class TestZNRecordSerializeWriteSizeLimit {
   }
 
   /*
-   * Tests data serializing when write size limit is set.
+   * Tests data serializing when compress threshold is set.
    * Two cases:
-   * 1. limit is not set
-   * --> default size is used.
-   * 2. limit is set
-   * --> serialized data is checked by the limit: pass or throw exception.
+   * 1. threshold is not set
+   * --> default threshold (write size limit is used): pass or throw exception.
+   * 2. threshold is set
+   * --> serialized data is checked by the threshold.
    */
   @Test
   public void testZNRecordSerializerWriteSizeLimit() {
     // Backup properties for later resetting.
     final String writeSizeLimitProperty =
         System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
+    final String compressThresholdProperty =
+        System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
 
     try {
-      // Unset write size limit property so default limit is used.
+      // Unset write size limit and compress threshold properties so default limit is used.
       System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
-
+      System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
       Assert.assertNull(
           System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
+      Assert.assertNull(
+          System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES));
+      verifyAutoCompression(600, false, false);
 
-      verifyAutoCompression(500, ZNRecord.SIZE_LIMIT, false, false, false);
-
-      // 2. Set size limit so serialized data is greater than the size limit but compressed data
-      // is smaller than the size limit.
-      // Set it to 2000 bytes
-      int writeSizeLimit = 2000;
-      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
-          String.valueOf(writeSizeLimit));
-
-      // Verify auto compression is done.
-      verifyAutoCompression(200, writeSizeLimit, true, true, false);
-
-      // 3. Set size limit to be be less than default value. The default value will be used for write
-      // size limit.
-      writeSizeLimit = 2000;
+      // 1. Set size limit but no compression threshold.
+      final int writeSizeLimit = 200 * 1024;
       System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
           String.valueOf(writeSizeLimit));
+      // Data size 100 KB < size limit 200 KB: directly write without compression.
+      verifyAutoCompression(100, false, false);
+      // Data size 300 KB > size limit 200 KB: compress expected.
+      verifyAutoCompression(300, true, false);
 
-      // Verify exception is thrown because compressed data is larger than size limit.
-      verifyAutoCompression(1000, writeSizeLimit, true, true, true);
+      // 2. Set threshold so serialized compressed data even the size is smaller than the size limit.
+      // Set it to 2000 bytes
+      final int threshold = 100 * 1024;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+          String.valueOf(threshold));
+      // Data size 150 KB < size limit 200 KB and > threshold 100 KB: compress expected.
+      verifyAutoCompression(150, true, false);
+
+      // 3. Set threshold to be larger than the write size limit. The default value will be used.
+      int doubleLimitThreshold = writeSizeLimit * 2;
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+          String.valueOf(doubleLimitThreshold));
+      // Data size 250 KB > size limit 200 KB but < threshold 400 KB: compress is still expected.
+      verifyAutoCompression(250, true, false);
+
+      // 4. Set threshold to zero so compression will always be done.
+      System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+          String.valueOf(0));
+      // Data size 250 KB > size limit 200 KB but < threshold 400 KB: compress is still expected.
+      verifyAutoCompression(5, true, false);
     } finally {
       // Reset: add the properties back to system properties if they were originally available.
       if (writeSizeLimitProperty != null) {
@@ -133,37 +147,39 @@ public class TestZNRecordSerializeWriteSizeLimit {
       } else {
         System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
       }
+      if (compressThresholdProperty != null) {
+        System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES,
+            compressThresholdProperty);
+      } else {
+        System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_THRESHOLD_BYTES);
+      }
     }
   }
 
-  private void verifyAutoCompression(int recordSize, int limit, boolean greaterThanThreshold,
-      boolean compressionExpected, boolean exceptionExpected) {
+  private void verifyAutoCompression(int recordSize, boolean compressionExpected, boolean exceptionExpected) {
     ZNRecord record = createZNRecord(recordSize);
 
     // Makes sure the length of serialized bytes is greater than limit to
     // satisfy the condition: serialized bytes' length exceeds the limit.
     byte[] preCompressedBytes = serialize(record);
-
-    Assert.assertEquals(preCompressedBytes.length > limit, greaterThanThreshold);
+    Assert.assertTrue(preCompressedBytes.length >= recordSize);
 
     ZkSerializer zkSerializer = new ZNRecordSerializer();
 
     byte[] bytes;
     try {
       bytes = zkSerializer.serialize(record);
-
-      Assert.assertEquals(bytes.length >= limit, exceptionExpected);
       Assert.assertFalse(exceptionExpected);
     } catch (ZkMarshallingError e) {
       Assert.assertTrue(exceptionExpected, "Should not throw exception.");
-      Assert.assertTrue(e.getMessage().contains(" is greater than " + limit + " bytes"));
       // No need to verify following asserts as bytes data is not returned.
       return;
     }
 
     // Verify whether serialized data is compressed or not.
     Assert.assertEquals(GZipCompressionUtil.isCompressed(bytes), compressionExpected);
-    Assert.assertEquals(preCompressedBytes.length != bytes.length, compressionExpected);
+    Assert.assertEquals(preCompressedBytes.length > bytes.length, compressionExpected,
+        "Compressed data should have a smaller size compared with the original data.");
 
     // Verify serialized bytes could correctly deserialize.
     Assert.assertEquals(zkSerializer.deserialize(bytes), record);