You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/06/24 23:49:24 UTC
[helix] 02/15: Adding Zk data change callback propagation latency
metric.
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 112587033881387b75ea298ba80fce3f9e67d5fe
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue May 21 19:01:16 2019 -0700
Adding Zk data change callback propagation latency metric.
Note that the latency metric only covers data change callback for now.
To adding child change callback, we need to find a way to avoid the additional ZK access that is required for read children node stats. Added TODO in the corresponding code block.
RB=1674550
G=helix-reviewers
A=jxue,ksun
Signed-off-by: Hunter Lee <hu...@linkedin.com>
---
.../helix/manager/zk/zookeeper/ZkClient.java | 39 ++++++++++++----
.../helix/monitoring/mbeans/ZkClientMonitor.java | 33 +++++++++----
.../monitoring/mbeans/ZkClientPathMonitor.java | 54 ++++++++++++++++------
.../apache/helix/manager/zk/TestRawZkClient.java | 27 +++++++++--
.../monitoring/mbeans/TestZkClientMonitor.java | 16 ++++++-
5 files changed, 129 insertions(+), 40 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 8718303..418140b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -17,6 +17,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -131,9 +132,14 @@ public class ZkClient implements Watcher {
}
private class ZkPathStatRecord {
+ private final String _path;
private Stat _stat = null;
private boolean _checked = false;
+ public ZkPathStatRecord(String path) {
+ _path = path;
+ }
+
public boolean pathExists() {
return _stat != null;
}
@@ -142,9 +148,19 @@ public class ZkClient implements Watcher {
return _checked;
}
- public void recordPathStat(Stat stat) {
+ /*
+ * Note this method is not thread safe.
+ */
+ public void recordPathStat(Stat stat, OptionalLong notificationTime) {
_checked = true;
_stat = stat;
+
+ if (_monitor != null && stat != null && notificationTime.isPresent()) {
+ long updateTime = Math.max(stat.getCtime(), stat.getMtime());
+ if (notificationTime.getAsLong() > updateTime) {
+ _monitor.recordDataPropagationLatency(_path, notificationTime.getAsLong() - updateTime);
+ } // else, the node was updated again after the notification. Propagation latency is unavailable.
+ }
}
}
@@ -628,6 +644,7 @@ public class ZkClient implements Watcher {
@Override
public void process(WatchedEvent event) {
+ long notificationTime = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Received event: " + event);
}
@@ -662,7 +679,7 @@ public class ZkClient implements Watcher {
processStateChanged(event);
}
if (dataChanged) {
- processDataOrChildChange(event);
+ processDataOrChildChange(event, notificationTime);
}
} finally {
if (stateChanged) {
@@ -701,7 +718,7 @@ public class ZkClient implements Watcher {
fireChildChangedEvents(entry.getKey(), entry.getValue());
}
for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) {
- fireDataChangedEvents(entry.getKey(), entry.getValue());
+ fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty());
}
}
@@ -943,13 +960,14 @@ public class ZkClient implements Watcher {
}
}
- private void processDataOrChildChange(WatchedEvent event) {
+ private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
final String path = event.getPath();
if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated
|| event.getType() == EventType.NodeDeleted) {
Set<IZkChildListener> childListeners = _childListener.get(path);
if (childListeners != null && !childListeners.isEmpty()) {
+ // TODO recording child changed event propagation latency as well. Note this change will introduce additional ZK access.
fireChildChangedEvents(path, childListeners);
}
}
@@ -958,14 +976,15 @@ public class ZkClient implements Watcher {
|| event.getType() == EventType.NodeCreated) {
Set<IZkDataListenerEntry> listeners = _dataListener.get(path);
if (listeners != null && !listeners.isEmpty()) {
- fireDataChangedEvents(event.getPath(), listeners);
+ fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime));
}
}
}
- private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners) {
+ private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners,
+ final OptionalLong notificationTime) {
try {
- final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
+ final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
// Trigger listener callbacks
for (final IZkDataListenerEntry listener : listeners) {
_eventThread.send(new ZkEvent(
@@ -975,7 +994,7 @@ public class ZkClient implements Watcher {
public void run() throws Exception {
// Reinstall watch before listener callbacks to check the znode status
if (!pathStatRecord.pathChecked()) {
- pathStatRecord.recordPathStat(getStat(path, true));
+ pathStatRecord.recordPathStat(getStat(path, true), notificationTime);
}
if (!pathStatRecord.pathExists()) {
// no znode found at the path, trigger data deleted handler.
@@ -1006,14 +1025,14 @@ public class ZkClient implements Watcher {
private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners) {
try {
- final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord();
+ final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
for (final IZkChildListener listener : childListeners) {
_eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener) {
@Override
public void run() throws Exception {
// Reinstall watch before listener callbacks to check the znode status
if (!pathStatRecord.pathChecked()) {
- pathStatRecord.recordPathStat(getStat(path, hasListeners(path)));
+ pathStatRecord.recordPathStat(getStat(path, hasListeners(path)), OptionalLong.empty());
}
List<String> children = null;
if (pathStatRecord.pathExists()) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index 0fe6911..88441be 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -24,6 +24,7 @@ import javax.management.MBeanAttributeInfo;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -152,17 +153,33 @@ public class ZkClientMonitor extends DynamicMBeanProvider {
}
}
+ public void recordDataPropagationLatency(String path, long latencyMilliSec) {
+ if (null == path) {
+ return;
+ }
+ Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
+ .filter(predefinedPath -> predefinedPath.match(path))
+ .forEach(predefinedPath -> {
+ ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
+ if (zkClientPathMonitor != null) {
+ zkClientPathMonitor.recordDataPropagationLatency(latencyMilliSec);
+ }
+ });
+ }
+
private void record(String path, int bytes, long latencyMilliSec, boolean isFailure,
boolean isRead) {
- for (ZkClientPathMonitor.PredefinedPath predefinedPath : ZkClientPathMonitor.PredefinedPath
- .values()) {
- if (predefinedPath.match(path)) {
- ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
- if (zkClientPathMonitor != null) {
- zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead);
- }
- }
+ if (null == path) {
+ return;
}
+ Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
+ .filter(predefinedPath -> predefinedPath.match(path))
+ .forEach(predefinedPath -> {
+ ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
+ if (zkClientPathMonitor != null) {
+ zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead);
+ }
+ });
}
public void record(String path, int dataSize, long startTimeMilliSec, AccessType accessType) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index 5f4778c..6cd0cd5 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -75,7 +75,11 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
ReadLatencyGauge,
WriteLatencyGauge,
ReadBytesGauge,
- WriteBytesGauge
+ WriteBytesGauge,
+ /*
+ * The latency between a ZK data change happening in the server side and the client side getting notification.
+ */
+ DataPropagationLatencyGuage
}
private SimpleDynamicMetric<Long> _readCounter;
@@ -91,6 +95,7 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
private HistogramDynamicMetric _writeLatencyGauge;
private HistogramDynamicMetric _readBytesGauge;
private HistogramDynamicMetric _writeBytesGauge;
+ private HistogramDynamicMetric _dataPropagationLatencyGauge;
@Override
public String getSensorName() {
@@ -107,23 +112,37 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
.format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey,
path.name());
- _writeTotalLatencyCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0l);
- _readTotalLatencyCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0l);
- _writeFailureCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
- _readFailureCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
- _writeBytesCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
- _readBytesCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
+ _writeTotalLatencyCounter =
+ new SimpleDynamicMetric(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0l);
+ _readTotalLatencyCounter =
+ new SimpleDynamicMetric(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0l);
+ _writeFailureCounter =
+ new SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
+ _readFailureCounter =
+ new SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
+ _writeBytesCounter =
+ new SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
+ _readBytesCounter =
+ new SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
_writeCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteCounter.name(), 0l);
_readCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadCounter.name(), 0l);
- _readLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(), new Histogram(
- new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
- _writeLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteLatencyGauge.name(), new Histogram(
- new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
- _readBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadBytesGauge.name(), new Histogram(
- new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
- _writeBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteBytesGauge.name(), new Histogram(
- new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _readLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(),
+ new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _writeLatencyGauge =
+ new HistogramDynamicMetric(PredefinedMetricDomains.WriteLatencyGauge.name(), new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _readBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadBytesGauge.name(),
+ new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _writeBytesGauge = new HistogramDynamicMetric(PredefinedMetricDomains.WriteBytesGauge.name(),
+ new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _dataPropagationLatencyGauge =
+ new HistogramDynamicMetric(PredefinedMetricDomains.DataPropagationLatencyGuage.name(),
+ new Histogram(new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS,
+ TimeUnit.MILLISECONDS)));
}
public ZkClientPathMonitor register() throws JMException {
@@ -140,6 +159,7 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
attributeList.add(_writeLatencyGauge);
attributeList.add(_readBytesGauge);
attributeList.add(_writeBytesGauge);
+ attributeList.add(_dataPropagationLatencyGauge);
ObjectName objectName = new ObjectName(String
.format("%s,%s=%s", ZkClientMonitor.getObjectName(_type, _key, _instanceName).toString(),
@@ -162,6 +182,10 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
}
}
+ public void recordDataPropagationLatency(long latency) {
+ _dataPropagationLatencyGauge.updateValue(latency);
+ }
+
private void increaseFailureCounter(boolean isRead) {
if (isRead) {
_readFailureCounter.updateValue(_readFailureCounter.getValue() + 1);
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index edbed63..ea67fbf 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -19,6 +19,8 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -26,8 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkServer;
@@ -40,6 +41,7 @@ import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
@@ -59,7 +61,6 @@ public class TestRawZkClient extends ZkUnitTestBase {
@BeforeClass
public void beforeClass() {
_zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
}
@AfterClass
@@ -86,7 +87,7 @@ public class TestRawZkClient extends ZkUnitTestBase {
newStat = _zkClient.getStat(path);
AssertJUnit.assertEquals(stat, newStat);
- _zkClient.writeData(path, new ZNRecord("Test"));
+ _zkClient.writeData(path, "Test");
newStat = _zkClient.getStat(path);
AssertJUnit.assertNotSame(stat, newStat);
}
@@ -265,10 +266,15 @@ public class TestRawZkClient extends ZkUnitTestBase {
zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) {
+ callbackLock();
}
@Override
public void handleDataDeleted(String dataPath) {
+ callbackLock();
+ }
+
+ private void callbackLock() {
lock.lock();
try {
callbackFinish.signal();
@@ -278,13 +284,24 @@ public class TestRawZkClient extends ZkUnitTestBase {
}
});
lock.lock();
- _zkClient.delete(TEST_PATH);
+ _zkClient.writeData(TEST_PATH, "Test");
Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackHandledCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(name, "PendingCallbackGauge"), 0);
+
+ // Simulate a delayed callback
+ int waitTime = 10;
+ Thread.sleep(waitTime);
+ lock.lock();
+ zkClient.process(new WatchedEvent(Watcher.Event.EventType.NodeDataChanged, null, TEST_PATH));
+ Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue(
+ (long) beanServer.getAttribute(rootname, "DataPropagationLatencyGuage.Max") >= waitTime);
+
+ _zkClient.delete(TEST_PATH);
}
@Test(dependsOnMethods = "testZkClientMonitor")
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 988ef60..9c04913 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -19,10 +19,12 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
-import javax.management.*;
-import org.apache.helix.manager.zk.zookeeper.ZkEventThread;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -138,5 +140,15 @@ public class TestZkClientMonitor {
.assertTrue((long) _beanServer.getAttribute(instancesName, "WriteLatencyGauge.Max") >= 10);
Assert.assertTrue(
(long) _beanServer.getAttribute(instancesName, "WriteTotalLatencyCounter") >= 10);
+
+ monitor
+ .recordDataPropagationLatency("TEST/INSTANCES/node_1/CURRENTSTATES/session_1/Resource", 5);
+ Assert
+ .assertEquals((long) _beanServer.getAttribute(rootName, "DataPropagationLatencyGuage.Max"), 5);
+ Assert.assertEquals(
+ (long) _beanServer.getAttribute(currentStateName, "DataPropagationLatencyGuage.Max"), 5);
+ Assert
+ .assertEquals((long) _beanServer.getAttribute(idealStateName, "DataPropagationLatencyGuage.Max"),
+ 0);
}
}