You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:49:00 UTC
[09/50] [abbrv] helix git commit: use SlidingTimeWindownReservoir for
histogram stats
use SlidingTimeWindownReservoir for histogram stats
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3a73b0f3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3a73b0f3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3a73b0f3
Branch: refs/heads/master
Commit: 3a73b0f30336c4610a501af979d55c4d25344214
Parents: 2f791a6
Author: hrzhang <hr...@linkedin.com>
Authored: Tue Nov 7 11:38:36 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Jan 24 18:31:00 2018 -0800
----------------------------------------------------------------------
.../monitoring/mbeans/ClusterEventMonitor.java | 35 ++++++--
.../monitoring/mbeans/ClusterStatusMonitor.java | 2 +-
.../monitoring/mbeans/HelixCallbackMonitor.java | 20 +++--
.../mbeans/MessageLatencyMonitor.java | 18 ++--
.../monitoring/mbeans/ResourceMonitor.java | 86 +++++++++++---------
.../monitoring/mbeans/ZkClientPathMonitor.java | 58 +++++++------
.../dynamicMBeans/DynamicMBeanProvider.java | 17 +---
.../TestClusterEventStatusMonitor.java | 68 +++++++++++++++-
8 files changed, 201 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
index e7f09ea..8c77466 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
@@ -35,20 +38,16 @@ public class ClusterEventMonitor extends DynamicMBeanProvider {
TotalProcessed
}
- private static final long RESET_INTERVAL = 1000 * 60 * 10; // 1 hour
private static final String CLUSTEREVENT_DN_KEY = "ClusterEventStatus";
private static final String EVENT_DN_KEY = "eventName";
private static final String PHASE_DN_KEY = "phaseName";
private final String _phaseName;
- private SimpleDynamicMetric<Long> _totalDuration =
- new SimpleDynamicMetric("TotalDurationCounter", 0l);
- private SimpleDynamicMetric<Long> _maxDuration =
- new SimpleDynamicMetric("MaxSingleDurationGauge", 0l);
- private SimpleDynamicMetric<Long> _count = new SimpleDynamicMetric("EventCounter", 0l);
- private HistogramDynamicMetric _duration = new HistogramDynamicMetric("DurationGauge",
- _metricRegistry.histogram(getMetricName("DurationGauge")));
+ private SimpleDynamicMetric<Long> _totalDuration;
+ private SimpleDynamicMetric<Long> _maxDuration;
+ private SimpleDynamicMetric<Long> _count;
+ private HistogramDynamicMetric _duration;
private long _lastResetTime;
private ClusterStatusMonitor _clusterStatusMonitor;
@@ -56,13 +55,31 @@ public class ClusterEventMonitor extends DynamicMBeanProvider {
public ClusterEventMonitor(ClusterStatusMonitor clusterStatusMonitor, String phaseName) {
_phaseName = phaseName;
_clusterStatusMonitor = clusterStatusMonitor;
+
+ _duration = new HistogramDynamicMetric("DurationGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _count = new SimpleDynamicMetric("EventCounter", 0l);
+ _maxDuration = new SimpleDynamicMetric("MaxSingleDurationGauge", 0l);
+ _totalDuration = new SimpleDynamicMetric("TotalDurationCounter", 0l);
+ }
+
+ public ClusterEventMonitor(ClusterStatusMonitor clusterStatusMonitor, String phaseName,
+ int histogramTimeWindowMs) {
+ _phaseName = phaseName;
+ _clusterStatusMonitor = clusterStatusMonitor;
+
+ _duration = new HistogramDynamicMetric("DurationGauge", new Histogram(
+ new SlidingTimeWindowReservoir(histogramTimeWindowMs, TimeUnit.MILLISECONDS)));
+ _count = new SimpleDynamicMetric("EventCounter", 0l);
+ _maxDuration = new SimpleDynamicMetric("MaxSingleDurationGauge", 0l);
+ _totalDuration = new SimpleDynamicMetric("TotalDurationCounter", 0l);
}
public void reportDuration(long duration) {
_totalDuration.updateValue(_totalDuration.getValue() + duration);
_count.updateValue(_count.getValue() + 1);
_duration.updateValue(duration);
- if (_lastResetTime + RESET_INTERVAL <= System.currentTimeMillis() ||
+ if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis() ||
duration > _maxDuration.getValue()) {
_maxDuration.updateValue(duration);
_lastResetTime = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index c644d10..61f4ce1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -69,7 +69,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>();
// phaseName -> eventMonitor
- private final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap =
+ protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap =
new ConcurrentHashMap<>();
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
index 0dbafb4..0fc2001 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
@@ -19,6 +19,8 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
import org.apache.helix.HelixConstants;
import org.apache.helix.InstanceType;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
@@ -28,6 +30,7 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
import javax.management.JMException;
import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.List;
public class HelixCallbackMonitor extends DynamicMBeanProvider {
@@ -42,14 +45,11 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider {
private final String _clusterName;
private final String _instanceName;
- private SimpleDynamicMetric<Long> _counter = new SimpleDynamicMetric("Counter", 0l);
- private SimpleDynamicMetric<Long> _unbatchedCounter =
- new SimpleDynamicMetric("UnbatchedCounter", 0l);
- private SimpleDynamicMetric<Long> _totalLatencyCounter =
- new SimpleDynamicMetric("LatencyCounter", 0l);
+ private SimpleDynamicMetric<Long> _counter;
+ private SimpleDynamicMetric<Long> _unbatchedCounter;
+ private SimpleDynamicMetric<Long> _totalLatencyCounter;
- private HistogramDynamicMetric _latencyGauge = new HistogramDynamicMetric("LatencyGauge",
- _metricRegistry.histogram(getMetricName("LatencyGauge")));
+ private HistogramDynamicMetric _latencyGauge;
public HelixCallbackMonitor(InstanceType type, String clusterName, String instanceName,
HelixConstants.ChangeType changeType) throws JMException {
@@ -62,6 +62,12 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider {
_sensorName = String
.format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), type.name(), clusterName,
changeType.name());
+
+ _latencyGauge = new HistogramDynamicMetric("LatencyGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _totalLatencyCounter = new SimpleDynamicMetric("LatencyCounter", 0l);
+ _unbatchedCounter = new SimpleDynamicMetric("UnbatchedCounter", 0l);
+ _counter = new SimpleDynamicMetric("Counter", 0l);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
index f1c1039..dac5826 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
@@ -35,19 +38,20 @@ public class MessageLatencyMonitor extends DynamicMBeanProvider {
private final String _domainName;
private final String _participantName;
- private SimpleDynamicMetric<Long> _totalMessageCount =
- new SimpleDynamicMetric("TotalMessageCount", 0l);
- private SimpleDynamicMetric<Long> _totalMessageLatency =
- new SimpleDynamicMetric("TotalMessageLatency", 0l);
- private HistogramDynamicMetric _messageLatencyGauge =
- new HistogramDynamicMetric("MessageLatencyGauge",
- _metricRegistry.histogram(getMetricName("MessageLatencyGauge")));
+ private SimpleDynamicMetric<Long> _totalMessageCount;
+ private SimpleDynamicMetric<Long> _totalMessageLatency;
+ private HistogramDynamicMetric _messageLatencyGauge;
public MessageLatencyMonitor(String domainName, String participantName) throws JMException {
_domainName = domainName;
_participantName = participantName;
_sensorName = String.format("%s.%s", ParticipantMessageMonitor.PARTICIPANT_STATUS_KEY,
"MessageLatency");
+
+ _messageLatencyGauge = new HistogramDynamicMetric("MessagelatencyGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _totalMessageLatency = new SimpleDynamicMetric("TotalMessageLatency", 0l);
+ _totalMessageCount = new SimpleDynamicMetric("TotalMessageCount", 0l);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 3318ddd..662f323 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -33,46 +36,27 @@ import javax.management.ObjectName;
import java.util.*;
public class ResourceMonitor extends DynamicMBeanProvider {
- private static final long RESET_TIME_RANGE = 1000 * 60 * 60; // 1 hour
// Gauges
- private SimpleDynamicMetric<Integer> _numOfPartitions =
- new SimpleDynamicMetric("PartitionGauge", 0);
- private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView =
- new SimpleDynamicMetric("ExternalViewPartitionGauge", 0);
- private SimpleDynamicMetric<Integer> _numOfErrorPartitions =
- new SimpleDynamicMetric("ErrorPartitionGauge", 0);
- private SimpleDynamicMetric<Integer> _numNonTopStatePartitions =
- new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0);
- private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions =
- new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
- private SimpleDynamicMetric<Long> _numLessReplicaPartitions =
- new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
- private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions =
- new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
- private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions =
- new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
- private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions =
- new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l);
- private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions =
- new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
- private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff =
- new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0);
+ private SimpleDynamicMetric<Integer> _numOfPartitions;
+ private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView;
+ private SimpleDynamicMetric<Integer> _numOfErrorPartitions;
+ private SimpleDynamicMetric<Integer> _numNonTopStatePartitions;
+ private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions;
+ private SimpleDynamicMetric<Long> _numLessReplicaPartitions;
+ private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions;
+ private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions;
+ private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions;
+ private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions;
+ private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff;
// Counters
- private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter =
- new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l);
- private SimpleDynamicMetric<Long> _successTopStateHandoffCounter =
- new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
- private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter =
- new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
- private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration =
- new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
- private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge =
- new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", _metricRegistry
- .histogram(getMetricName("PartitionTopStateHandoffDurationGauge")));
- private SimpleDynamicMetric<Long> _totalMessageReceived =
- new SimpleDynamicMetric("TotalMessageReceived", 0l);
+ private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter;
+ private SimpleDynamicMetric<Long> _successTopStateHandoffCounter;
+ private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter;
+ private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration;
+ private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+ private SimpleDynamicMetric<Long> _totalMessageReceived;
private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private long _lastResetTime;
@@ -114,6 +98,34 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_clusterName = clusterName;
_resourceName = resourceName;
_initObjectName = objectName;
+
+ _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0l);
+ _numLoadRebalanceThrottledPartitions =
+ new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
+ _numRecoveryRebalanceThrottledPartitions =
+ new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l);
+ _numPendingLoadRebalancePartitions =
+ new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
+ _numPendingRecoveryRebalancePartitions =
+ new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
+ _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
+ _numLessMinActiveReplicaPartitions =
+ new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
+ _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0l);
+ _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0l);
+ _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0l);
+ _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0l);
+
+ _partitionTopStateHandoffDurationGauge =
+ new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0l);
+ _maxSinglePartitionTopStateHandoffDuration =
+ new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
+ _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
+ _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
+ _successfulTopStateHandoffDurationCounter =
+ new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l);
}
@Override
@@ -341,7 +353,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
}
public void resetMaxTopStateHandoffGauge() {
- if (_lastResetTime + RESET_TIME_RANGE <= System.currentTimeMillis()) {
+ if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
_maxSinglePartitionTopStateHandoffDuration.updateValue(0l);
_lastResetTime = System.currentTimeMillis();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index e87738b..bc6a36b 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
@@ -62,30 +65,19 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
}
}
- private SimpleDynamicMetric<Long> _readCounter = new SimpleDynamicMetric("ReadCounter", 0l);
- private SimpleDynamicMetric<Long> _writeCounter = new SimpleDynamicMetric("WriteCounter", 0l);
- private SimpleDynamicMetric<Long> _readBytesCounter =
- new SimpleDynamicMetric("ReadBytesCounter", 0l);
- private SimpleDynamicMetric<Long> _writeBytesCounter =
- new SimpleDynamicMetric("WriteBytesCounter", 0l);
- private SimpleDynamicMetric<Long> _readFailureCounter =
- new SimpleDynamicMetric("ReadFailureCounter", 0l);
- private SimpleDynamicMetric<Long> _writeFailureCounter =
- new SimpleDynamicMetric("WriteFailureCounter", 0l);
- private SimpleDynamicMetric<Long> _readTotalLatencyCounter =
- new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l);
- private SimpleDynamicMetric<Long> _writeTotalLatencyCounter =
- new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l);
-
- private HistogramDynamicMetric _readLatencyGauge = new HistogramDynamicMetric("ReadLatencyGauge",
- _metricRegistry.histogram(getMetricName("ReadLatencyGauge")));
- private HistogramDynamicMetric _writeLatencyGauge =
- new HistogramDynamicMetric("WriteLatencyGauge",
- _metricRegistry.histogram(getMetricName("WriteLatencyGauge")));
- private HistogramDynamicMetric _readBytesGauge = new HistogramDynamicMetric("ReadBytesGauge",
- _metricRegistry.histogram(getMetricName("ReadBytesGauge")));
- private HistogramDynamicMetric _writeBytesGauge = new HistogramDynamicMetric("WriteBytesGauge",
- _metricRegistry.histogram(getMetricName("WriteBytesGauge")));
+ private SimpleDynamicMetric<Long> _readCounter;
+ private SimpleDynamicMetric<Long> _writeCounter;
+ private SimpleDynamicMetric<Long> _readBytesCounter;
+ private SimpleDynamicMetric<Long> _writeBytesCounter;
+ private SimpleDynamicMetric<Long> _readFailureCounter;
+ private SimpleDynamicMetric<Long> _writeFailureCounter;
+ private SimpleDynamicMetric<Long> _readTotalLatencyCounter;
+ private SimpleDynamicMetric<Long> _writeTotalLatencyCounter;
+
+ private HistogramDynamicMetric _readLatencyGauge;
+ private HistogramDynamicMetric _writeLatencyGauge;
+ private HistogramDynamicMetric _readBytesGauge;
+ private HistogramDynamicMetric _writeBytesGauge;
@Override
public String getSensorName() {
@@ -101,6 +93,24 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
_sensorName = String
.format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey,
path.name());
+
+ _writeTotalLatencyCounter = new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l);
+ _readTotalLatencyCounter = new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l);
+ _writeFailureCounter = new SimpleDynamicMetric("WriteFailureCounter", 0l);
+ _readFailureCounter = new SimpleDynamicMetric("ReadFailureCounter", 0l);
+ _writeBytesCounter = new SimpleDynamicMetric("WriteBytesCounter", 0l);
+ _readBytesCounter = new SimpleDynamicMetric("ReadBytesCounter", 0l);
+ _writeCounter = new SimpleDynamicMetric("WriteCounter", 0l);
+ _readCounter = new SimpleDynamicMetric("ReadCounter", 0l);
+
+ _readLatencyGauge = new HistogramDynamicMetric("ReadLatencyGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _writeLatencyGauge = new HistogramDynamicMetric("WriteLatencyGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _readBytesGauge = new HistogramDynamicMetric("ReadBytesGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _writeBytesGauge = new HistogramDynamicMetric("WriteBytesGauge", new Histogram(
+ new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
}
public ZkClientPathMonitor register() throws JMException {
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index cc97b3b..b44c63c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -19,9 +19,6 @@ package org.apache.helix.monitoring.mbeans.dynamicMBeans;
* under the License.
*/
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
import org.apache.helix.HelixException;
import org.apache.helix.monitoring.SensorNameProvider;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
@@ -36,7 +33,7 @@ import java.util.*;
*/
public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNameProvider {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
- protected static final MetricRegistry _metricRegistry = new MetricRegistry();
+ protected static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // Reset time every hour
private static String SENSOR_NAME_TAG = "SensorName";
private static String DEFAULT_DESCRIPTION =
"Information on the management interface of the MBean";
@@ -86,11 +83,6 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
doRegister(dynamicMetrics, null, objectName);
}
- protected String getMetricName(String metricName) {
- return MetricRegistry
- .name(getClass().getSimpleName(), Integer.toHexString(hashCode()), metricName);
- }
-
/**
* Update the Dynamic MBean provider with new metric list.
*
@@ -145,13 +137,6 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
* After unregistered, the MBean can't be registered again, a new monitor has be to created.
*/
public synchronized void unregister() {
- final String metricNamePrefix = getMetricName(null);
- _metricRegistry.removeMatching(new MetricFilter() {
- @Override
- public boolean matches(String name, Metric metric) {
- return name.startsWith(metricNamePrefix);
- }
- });
MBeanRegistrar.unregister(_objectName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
index eb4f94b..b607add 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
@@ -22,8 +22,10 @@ package org.apache.helix.monitoring;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
+import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -40,21 +42,43 @@ import org.testng.Assert;
import org.testng.annotations.Test;
public class TestClusterEventStatusMonitor {
+ private static final int TEST_SLIDING_WINDOW_MS = 2000; // 2s window for testing
+
+ private class ClusterStatusMonitorForTest extends ClusterStatusMonitor {
+ public ClusterStatusMonitorForTest(String clusterName) {
+ super(clusterName);
+ }
+ public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMBean() {
+ return _clusterEventMbeanMap;
+ }
+ }
@Test()
public void test()
throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException,
IOException, InterruptedException, MBeanException, AttributeNotFoundException,
- ReflectionException {
+ ReflectionException{
System.out.println("START TestClusterEventStatusMonitor");
String clusterName = "TestCluster";
- ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+ ClusterStatusMonitorForTest monitor = new ClusterStatusMonitorForTest(clusterName);
MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
Set<ObjectInstance> mbeans =
_server.queryMBeans(new ObjectName("ClusterStatus:Cluster=TestCluster,eventName=ClusterEvent,*"), null);
Assert.assertEquals(mbeans.size(), 0);
+ // Customize event monitors for testing
+ try {
+ this.addTestEventMonitor(monitor, ClusterEventMonitor.PhaseName.Callback.name());
+ this.addTestEventMonitor(monitor, ClusterEventMonitor.PhaseName.InQueue.name());
+ this.addTestEventMonitor(monitor, BestPossibleStateCalcStage.class.getSimpleName());
+ this.addTestEventMonitor(monitor, ReadClusterDataStage.class.getSimpleName());
+ this.addTestEventMonitor(monitor, IntermediateStateCalcStage.class.getSimpleName());
+ this.addTestEventMonitor(monitor, TaskAssignmentStage.class.getSimpleName());
+ } catch (JMException jme) {
+ Assert.assertTrue(false, "Failed to customize event monitors");
+ }
+
int count = 5;
Long totalDuration = 0L;
for (int i = 1; i <= count; i++) {
@@ -77,9 +101,39 @@ public class TestClusterEventStatusMonitor {
Long maxDuration = (Long) _server.getAttribute(mbean.getObjectName(), "MaxSingleDurationGauge");
Long eventCount = (Long) _server.getAttribute(mbean.getObjectName(), "EventCounter");
+ Double pct75th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct75th");
+ Double pct95th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct95th");
+ Double pct99th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct99th");
+ Long max = (Long) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Max");
+ Double stddev = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.StdDev");
+
Assert.assertEquals(duration, totalDuration);
Assert.assertEquals(maxDuration, Long.valueOf(100 * count));
Assert.assertEquals(eventCount, Long.valueOf(count));
+ Assert.assertTrue(Math.abs(pct75th - 450.0) < 1);
+ Assert.assertTrue(Math.abs(pct95th - 500.0) < 1);
+ Assert.assertTrue(Math.abs(pct99th - 500.0) < 1);
+ Assert.assertTrue(max == 500);
+ Assert.assertTrue(Math.abs(stddev - 158.0) < 0.2);
+ }
+
+ System.out.println("\nWaiting for time window to expire\n");
+ Thread.sleep(TEST_SLIDING_WINDOW_MS);
+
+ // Since sliding window has expired, just make sure histograms have its values reset
+ for (ObjectInstance mbean : mbeans) {
+ Double pct75th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct75th");
+ Double pct95th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct95th");
+ Double pct99th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct99th");
+ Long max = (Long) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Max");
+ Double stddev = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.StdDev");
+
+ Assert.assertTrue(pct75th == 0.0);
+ Assert.assertTrue(pct95th == 0.0);
+ Assert.assertTrue(pct99th == 0.0);
+ Assert.assertTrue(max == 0);
+ Assert.assertTrue(stddev == 0.0);
+
}
monitor.reset();
@@ -91,4 +145,14 @@ public class TestClusterEventStatusMonitor {
System.out.println("END TestParticipantMonitor");
}
+
+ private void addTestEventMonitor(ClusterStatusMonitorForTest monitor, String phaseName) throws
+ JMException {
+ ConcurrentHashMap<String, ClusterEventMonitor> mbean = monitor.getClusterEventMBean();
+ ClusterEventMonitor eventMonitor = new ClusterEventMonitor(monitor, phaseName,
+ TEST_SLIDING_WINDOW_MS);
+ eventMonitor.register();
+ mbean.put(phaseName, eventMonitor);
+ }
+
}