You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/06/24 23:49:24 UTC

[helix] 02/15: Adding Zk data change callback propagation latency metric.

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 112587033881387b75ea298ba80fce3f9e67d5fe
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue May 21 19:01:16 2019 -0700

    Adding Zk data change callback propagation latency metric.
    
    Note that the latency metric only covers data change callback for now.
    To adding child change callback, we need to find a way to avoid the additional ZK access that is required for read children node stats. Added TODO in the corresponding code block.
    
    RB=1674550
    G=helix-reviewers
    A=jxue,ksun
    
    Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
 .../helix/manager/zk/zookeeper/ZkClient.java       | 39 ++++++++++++----
 .../helix/monitoring/mbeans/ZkClientMonitor.java   | 33 +++++++++----
 .../monitoring/mbeans/ZkClientPathMonitor.java     | 54 ++++++++++++++++------
 .../apache/helix/manager/zk/TestRawZkClient.java   | 27 +++++++++--
 .../monitoring/mbeans/TestZkClientMonitor.java     | 16 ++++++-
 5 files changed, 129 insertions(+), 40 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 8718303..418140b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -17,6 +17,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -131,9 +132,14 @@ public class ZkClient implements Watcher {
   }
 
   private class ZkPathStatRecord {
+    private final String _path;
     private Stat _stat = null;
     private boolean _checked = false;
 
+    public ZkPathStatRecord(String path) {
+      _path = path;
+    }
+
     public boolean pathExists() {
       return _stat != null;
     }
@@ -142,9 +148,19 @@ public class ZkClient implements Watcher {
       return _checked;
     }
 
-    public void recordPathStat(Stat stat) {
+    /*
+     * Note this method is not thread safe.
+     */
+    public void recordPathStat(Stat stat, OptionalLong notificationTime) {
       _checked = true;
       _stat = stat;
+
+      if (_monitor != null && stat != null && notificationTime.isPresent()) {
+        long updateTime = Math.max(stat.getCtime(), stat.getMtime());
+        if (notificationTime.getAsLong() > updateTime) {
+          _monitor.recordDataPropagationLatency(_path, notificationTime.getAsLong() - updateTime);
+        } // else, the node was updated again after the notification. Propagation latency is unavailable.
+      }
     }
   }
 
@@ -628,6 +644,7 @@ public class ZkClient implements Watcher {
 
   @Override
   public void process(WatchedEvent event) {
+    long notificationTime = System.currentTimeMillis();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Received event: " + event);
     }
@@ -662,7 +679,7 @@ public class ZkClient implements Watcher {
         processStateChanged(event);
       }
       if (dataChanged) {
-        processDataOrChildChange(event);
+        processDataOrChildChange(event, notificationTime);
       }
     } finally {
       if (stateChanged) {
@@ -701,7 +718,7 @@ public class ZkClient implements Watcher {
       fireChildChangedEvents(entry.getKey(), entry.getValue());
     }
     for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) {
-      fireDataChangedEvents(entry.getKey(), entry.getValue());
+      fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty());
     }
   }
 
@@ -943,13 +960,14 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void processDataOrChildChange(WatchedEvent event) {
+  private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
     final String path = event.getPath();
 
     if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated
         || event.getType() == EventType.NodeDeleted) {
       Set<IZkChildListener> childListeners = _childListener.get(path);
       if (childListeners != null && !childListeners.isEmpty()) {
+        // TODO recording child changed event propagation latency as well. Note this change will introduce additional ZK access.
         fireChildChangedEvents(path, childListeners);
       }
     }
@@ -958,14 +976,15 @@ public class ZkClient implements Watcher {
         || event.getType() == EventType.NodeCreated) {
       Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
       if (listeners != null && !listeners.isEmpty()) {
-        fireDataChangedEvents(event.getPath(), listeners);
+        fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime));
       }
     }
   }
 
-  private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners) {
+  private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners,
+      final OptionalLong notificationTime) {
     try {
-      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
+      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       // Trigger listener callbacks
       for (final IZkDataListenerEntry listener : listeners) {
         _eventThread.send(new ZkEvent(
@@ -975,7 +994,7 @@ public class ZkClient implements Watcher {
           public void run() throws Exception {
             // Reinstall watch before listener callbacks to check the znode status
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, true));
+              pathStatRecord.recordPathStat(getStat(path, true), notificationTime);
             }
             if (!pathStatRecord.pathExists()) {
               // no znode found at the path, trigger data deleted handler.
@@ -1006,14 +1025,14 @@ public class ZkClient implements Watcher {
 
   private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners) {
     try {
-      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
+      final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
       for (final IZkChildListener listener : childListeners) {
         _eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener) {
           @Override
           public void run() throws Exception {
             // Reinstall watch before listener callbacks to check the znode status
             if (!pathStatRecord.pathChecked()) {
-              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)));
+              pathStatRecord.recordPathStat(getStat(path, hasListeners(path)), OptionalLong.empty());
             }
             List<String> children = null;
             if (pathStatRecord.pathExists()) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index 0fe6911..88441be 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -24,6 +24,7 @@ import javax.management.MBeanAttributeInfo;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -152,17 +153,33 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
     }
   }
 
+  public void recordDataPropagationLatency(String path, long latencyMilliSec) {
+    if (null == path) {
+      return;
+    }
+    Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
+        .filter(predefinedPath -> predefinedPath.match(path))
+        .forEach(predefinedPath -> {
+      ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
+      if (zkClientPathMonitor != null) {
+        zkClientPathMonitor.recordDataPropagationLatency(latencyMilliSec);
+      }
+    });
+  }
+
   private void record(String path, int bytes, long latencyMilliSec, boolean isFailure,
       boolean isRead) {
-    for (ZkClientPathMonitor.PredefinedPath predefinedPath : ZkClientPathMonitor.PredefinedPath
-        .values()) {
-      if (predefinedPath.match(path)) {
-        ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
-        if (zkClientPathMonitor != null) {
-          zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead);
-        }
-      }
+    if (null == path) {
+      return;
     }
+    Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
+        .filter(predefinedPath -> predefinedPath.match(path))
+        .forEach(predefinedPath -> {
+      ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
+      if (zkClientPathMonitor != null) {
+        zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead);
+      }
+    });
   }
 
   public void record(String path, int dataSize, long startTimeMilliSec, AccessType accessType) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index 5f4778c..6cd0cd5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -75,7 +75,11 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     ReadLatencyGauge,
     WriteLatencyGauge,
     ReadBytesGauge,
-    WriteBytesGauge
+    WriteBytesGauge,
+    /*
+     * The latency between a ZK data change happening in the server side and the client side getting notification.
+     */
+    DataPropagationLatencyGuage
   }
 
   private SimpleDynamicMetric<Long> _readCounter;
@@ -91,6 +95,7 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
   private HistogramDynamicMetric _writeLatencyGauge;
   private HistogramDynamicMetric _readBytesGauge;
   private HistogramDynamicMetric _writeBytesGauge;
+  private HistogramDynamicMetric _dataPropagationLatencyGauge;
 
   @Override
   public String getSensorName() {
@@ -107,23 +112,37 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
         .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey,
             path.name());
 
-    _writeTotalLatencyCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0l);
-    _readTotalLatencyCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0l);
-    _writeFailureCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
-    _readFailureCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
-    _writeBytesCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
-    _readBytesCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
+    _writeTotalLatencyCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0l);
+    _readTotalLatencyCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0l);
+    _writeFailureCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
+    _readFailureCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
+    _writeBytesCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
+    _readBytesCounter =
+        new SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
     _writeCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteCounter.name(), 0l);
     _readCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadCounter.name(), 0l);
 
-    _readLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
-    _writeLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteLatencyGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
-    _readBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadBytesGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
-    _writeBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteBytesGauge.name(), new Histogram(
-        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _readLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(),
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _writeLatencyGauge =
+        new HistogramDynamicMetric(PredefinedMetricDomains.WriteLatencyGauge.name(), new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _readBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadBytesGauge.name(),
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _writeBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteBytesGauge.name(),
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _dataPropagationLatencyGauge =
+        new HistogramDynamicMetric(PredefinedMetricDomains.DataPropagationLatencyGuage.name(),
+            new Histogram(new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS,
+                TimeUnit.MILLISECONDS)));
   }
 
   public ZkClientPathMonitor register() throws JMException {
@@ -140,6 +159,7 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     attributeList.add(_writeLatencyGauge);
     attributeList.add(_readBytesGauge);
     attributeList.add(_writeBytesGauge);
+    attributeList.add(_dataPropagationLatencyGauge);
 
     ObjectName objectName = new ObjectName(String
         .format("%s,%s=%s", ZkClientMonitor.getObjectName(_type, _key, _instanceName).toString(),
@@ -162,6 +182,10 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     }
   }
 
+  public void recordDataPropagationLatency(long latency) {
+    _dataPropagationLatencyGauge.updateValue(latency);
+  }
+
   private void increaseFailureCounter(boolean isRead) {
     if (isRead) {
       _readFailureCounter.updateValue(_readFailureCounter.getValue() + 1);
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index edbed63..ea67fbf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -19,6 +19,8 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -26,8 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
@@ -40,6 +41,7 @@ import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
@@ -59,7 +61,6 @@ public class TestRawZkClient extends ZkUnitTestBase {
   @BeforeClass
   public void beforeClass() {
     _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
   }
 
   @AfterClass
@@ -86,7 +87,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
     newStat = _zkClient.getStat(path);
     AssertJUnit.assertEquals(stat, newStat);
 
-    _zkClient.writeData(path, new ZNRecord("Test"));
+    _zkClient.writeData(path, "Test");
     newStat = _zkClient.getStat(path);
     AssertJUnit.assertNotSame(stat, newStat);
   }
@@ -265,10 +266,15 @@ public class TestRawZkClient extends ZkUnitTestBase {
     zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
       @Override
       public void handleDataChange(String dataPath, Object data) {
+        callbackLock();
       }
 
       @Override
       public void handleDataDeleted(String dataPath) {
+        callbackLock();
+      }
+
+      private void callbackLock() {
         lock.lock();
         try {
           callbackFinish.signal();
@@ -278,13 +284,24 @@ public class TestRawZkClient extends ZkUnitTestBase {
       }
     });
     lock.lock();
-    _zkClient.delete(TEST_PATH);
+    _zkClient.writeData(TEST_PATH, "Test");
     Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
     Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackHandledCounter"), 1);
     Assert.assertEquals((long) beanServer.getAttribute(name, "PendingCallbackGauge"), 0);
+
+    // Simulate a delayed callback
+    int waitTime = 10;
+    Thread.sleep(waitTime);
+    lock.lock();
+    zkClient.process(new WatchedEvent(Watcher.Event.EventType.NodeDataChanged, null, TEST_PATH));
+    Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
+    Assert.assertTrue(
+        (long) beanServer.getAttribute(rootname, "DataPropagationLatencyGuage.Max") >= waitTime);
+
+    _zkClient.delete(TEST_PATH);
   }
 
   @Test(dependsOnMethods = "testZkClientMonitor")
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 988ef60..9c04913 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -19,10 +19,12 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import javax.management.*;
 
-import org.apache.helix.manager.zk.zookeeper.ZkEventThread;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -138,5 +140,15 @@ public class TestZkClientMonitor {
         .assertTrue((long) _beanServer.getAttribute(instancesName, "WriteLatencyGauge.Max") >= 10);
     Assert.assertTrue(
         (long) _beanServer.getAttribute(instancesName, "WriteTotalLatencyCounter") >= 10);
+
+    monitor
+        .recordDataPropagationLatency("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5);
+    Assert
+        .assertEquals((long) _beanServer.getAttribute(rootName, "DataPropagationLatencyGuage.Max"), 5);
+    Assert.assertEquals(
+        (long) _beanServer.getAttribute(currentStateName, "DataPropagationLatencyGuage.Max"), 5);
+    Assert
+        .assertEquals((long) _beanServer.getAttribute(idealStateName, "DataPropagationLatencyGuage.Max"),
+            0);
   }
 }