You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2022/02/02 20:29:14 UTC

[helix] branch master updated: Add new metrics to record ZNRecord compression count. (#1943)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 05cf861  Add new metrics to record ZNRecord compression count. (#1943)
05cf861 is described below

commit 05cf861dfb8578977163015f945b6a152f23e43c
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Wed Feb 2 12:28:14 2022 -0800

    Add new metrics to record ZNRecord compression count. (#1943)
    
    This PR determines if a ZK write request is compressed by calling GZipCompressionUtil. This is an indirect method and can be inaccurate. So the decision is based on trade-offs.
    
    Alternatively, the ZkClientMonitor can be passed into the serializer class and then report compressed write internally. However, this will require multiple changes in the serializer interfaces.
    Due to the multiple layers (PathBasedZkSerializer, ZkSerializer) of serializer interfaces definition, it would be very costly to implement the alternative without major refactoring.
---
 .../helix/integration/TestEnableCompression.java   | 20 ++++++++++-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 41 ++++++++++------------
 .../callback/ZkAsyncCallMonitorContext.java        | 14 +++++++-
 .../zkclient/callback/ZkAsyncRetryCallContext.java | 11 ++++--
 .../zookeeper/zkclient/metric/ZkClientMonitor.java | 11 ++++--
 5 files changed, 69 insertions(+), 28 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
index 67638a6..fcc94e9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
@@ -19,12 +19,15 @@ package org.apache.helix.integration;
  * under the License.
  */
 
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
@@ -32,10 +35,13 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
 import org.apache.helix.zookeeper.zkclient.serialize.BytesPushThroughSerializer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -133,6 +139,18 @@ public class TestEnableCompression extends ZkTestBase {
     Assert.assertTrue(compressedPaths.contains(idealstatePath));
     Assert.assertTrue(compressedPaths.contains(externalViewPath));
 
+    // Validate the compressed ZK nodes count == external view nodes
+    MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+    ObjectName name =
+        MBeanRegistrar.buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+            InstanceType.CONTROLLER.name(), ZkClientMonitor.MONITOR_KEY,
+            clusterName + "." + controller.getInstanceName());
+    // The controller ZkClient only writes one compressed node, which is the External View node.
+    long compressCount = (long) beanServer.getAttribute(name, "CompressedZnodeWriteCounter");
+    // Note since external view node is updated in every controller pipeline, there would be multiple compressed writes.
+    // However, the total count won't exceed the external view node version (starts from 0).
+    Assert.assertTrue(compressCount >= 1 && compressCount <= zkClient.getStat(externalViewPath).getVersion() + 1);
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index cbbdbbd..be72920 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -33,12 +33,12 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.management.JMException;
-
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.util.ZNRecordUtil;
 import org.apache.helix.zookeeper.zkclient.annotation.PreFetchChangedData;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext;
@@ -1907,12 +1907,8 @@ public class ZkClient implements Watcher {
     try {
       final byte[] data = serialize(datat, path);
       checkDataSizeLimit(path, data);
-      final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
-        @Override
-        public Object call() throws Exception {
-          return getConnection().writeDataReturnStat(path, data, expectedVersion);
-        }
-      });
+      final Stat stat = (Stat) retryUntilConnected(
+          (Callable<Object>) () -> getConnection().writeDataReturnStat(path, data, expectedVersion));
       record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
       return stat;
     } catch (Exception e) {
@@ -1945,19 +1941,17 @@ public class ZkClient implements Watcher {
   }
 
   private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
-      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb,
-      final String expectedSessionId) {
+      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, final String expectedSessionId) {
     try {
       retryUntilConnected(() -> {
-        getExpectedZookeeper(expectedSessionId)
-            .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
-                new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) {
-                  @Override
-                  protected void doRetry() {
-                    doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb,
-                        expectedSessionId);
-                  }
-                });
+        getExpectedZookeeper(expectedSessionId).create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false,
+                GZipCompressionUtil.isCompressed(data)) {
+              @Override
+              protected void doRetry() {
+                doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb, expectedSessionId);
+              }
+            });
         return null;
       });
     } catch (RuntimeException e) {
@@ -1988,12 +1982,11 @@ public class ZkClient implements Watcher {
     try {
       retryUntilConnected(() -> {
         getExpectedZookeeper(expectedSessionId).setData(path, data, version, cb,
-            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT,
-                data == null ? 0 : data.length, false) {
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, data == null ? 0 : data.length,
+                false, GZipCompressionUtil.isCompressed(data)) {
               @Override
               protected void doRetry() {
-                doAsyncSetData(path, data, version, System.currentTimeMillis(), cb,
-                    expectedSessionId);
+                doAsyncSetData(path, data, version, System.currentTimeMillis(), cb, expectedSessionId);
               }
             });
         return null;
@@ -2447,6 +2440,10 @@ public class ZkClient implements Watcher {
     if (_monitor != null) {
       int dataSize = (data != null) ? data.length : 0;
       _monitor.record(path, dataSize, startTimeMilliSec, accessType);
+
+      if (GZipCompressionUtil.isCompressed(data)) {
+        _monitor.increaseZnodeCompressCounter();
+      }
     }
   }
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
index ce691e9..22ae047 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
@@ -25,6 +25,7 @@ public class ZkAsyncCallMonitorContext {
   private final long _startTimeMilliSec;
   private final ZkClientMonitor _monitor;
   private final boolean _isRead;
+  private final boolean _isCompressed;
   private int _bytes;
 
   /**
@@ -32,15 +33,23 @@ public class ZkAsyncCallMonitorContext {
    * @param startTimeMilliSec Operation initialization time.
    * @param bytes             The data size in bytes that is involved in the operation.
    * @param isRead            True if the operation is readonly.
+   * @param isCompressed      True if the data is compressed.
    */
   public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes,
-      boolean isRead) {
+      boolean isRead, boolean isCompressed) {
     _monitor = monitor;
     _startTimeMilliSec = startTimeMilliSec;
     _bytes = bytes;
     _isRead = isRead;
+    _isCompressed = isCompressed;
+  }
+
+  public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes,
+      boolean isRead) {
+    this(monitor, startTimeMilliSec, bytes, isRead, false);
   }
 
+
   /**
    * Update the operated data size in bytes.
    * @param bytes
@@ -59,6 +68,9 @@ public class ZkAsyncCallMonitorContext {
         _monitor.recordAsync(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.READ);
       } else {
         _monitor.recordAsync(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.WRITE);
+        if (_isCompressed) {
+          _monitor.increaseZnodeCompressCounter();
+        }
       }
     }
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
index d444dda..6ac6726 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
@@ -36,15 +36,22 @@ public abstract class ZkAsyncRetryCallContext extends ZkAsyncCallMonitorContext
    * @param startTimeMilliSec Operation initialization time.
    * @param bytes             The data size in bytes that is involved in the operation.
    * @param isRead            True if the operation is readonly.
+   * @param isCompressed      True if the data is compressed.
    */
   public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread,
       final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor,
-      long startTimeMilliSec, int bytes, boolean isRead) {
-    super(monitor, startTimeMilliSec, bytes, isRead);
+      long startTimeMilliSec, int bytes, boolean isRead, boolean isCompressed) {
+    super(monitor, startTimeMilliSec, bytes, isRead, isCompressed);
     _retryThread = retryThread;
     _cancellableCallback = callback;
   }
 
+  public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread,
+      final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor,
+      long startTimeMilliSec, int bytes, boolean isRead) {
+    this(retryThread, callback, monitor, startTimeMilliSec, bytes, isRead, false);
+  }
+
   /**
    * Request a retry.
    *
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index 4ce298f..1f71e42 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -38,8 +38,6 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 import org.apache.helix.monitoring.mbeans.exception.MetricException;
 import org.apache.helix.zookeeper.zkclient.ZkEventThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class ZkClientMonitor extends DynamicMBeanProvider {
@@ -62,6 +60,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _expiredSessionCounter;
   private SimpleDynamicMetric<Long> _dataChangeEventCounter;
   private SimpleDynamicMetric<Long> _outstandingRequestGauge;
+  private SimpleDynamicMetric<Long> _znodeCompressCounter;
 
   private ZkThreadMetric _zkEventThreadMetric;
 
@@ -86,6 +85,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     _expiredSessionCounter = new SimpleDynamicMetric("ExpiredSessionCounter", 0l);
     _dataChangeEventCounter = new SimpleDynamicMetric("DataChangeEventCounter", 0l);
     _outstandingRequestGauge = new SimpleDynamicMetric("OutstandingRequestGauge", 0l);
+    _znodeCompressCounter = new SimpleDynamicMetric("CompressedZnodeWriteCounter", 0l);
 
     if (zkEventThread != null) {
       boolean result = setAndInitZkEventThreadMonitor(zkEventThread);
@@ -128,6 +128,7 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     attributeList.add(_outstandingRequestGauge);
     attributeList.add(_stateChangeEventCounter);
     attributeList.add(_expiredSessionCounter);
+    attributeList.add(_znodeCompressCounter);
     if (_zkEventThreadMetric != null) {
       attributeList.add(_zkEventThreadMetric);
     }
@@ -190,6 +191,12 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     }
   }
 
+  public void increaseZnodeCompressCounter() {
+    synchronized (_znodeCompressCounter) {
+      _znodeCompressCounter.updateValue(_znodeCompressCounter.getValue() + 1);
+    }
+  }
+
   public void recordDataPropagationLatency(String path, long latencyMilliSec) {
     if (null == path) {
       return;