You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:49:00 UTC

[09/50] [abbrv] helix git commit: use SlidingTimeWindownReservoir for histogram stats

use SlidingTimeWindownReservoir for histogram stats


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3a73b0f3
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3a73b0f3
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3a73b0f3

Branch: refs/heads/master
Commit: 3a73b0f30336c4610a501af979d55c4d25344214
Parents: 2f791a6
Author: hrzhang <hr...@linkedin.com>
Authored: Tue Nov 7 11:38:36 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Jan 24 18:31:00 2018 -0800

----------------------------------------------------------------------
 .../monitoring/mbeans/ClusterEventMonitor.java  | 35 ++++++--
 .../monitoring/mbeans/ClusterStatusMonitor.java |  2 +-
 .../monitoring/mbeans/HelixCallbackMonitor.java | 20 +++--
 .../mbeans/MessageLatencyMonitor.java           | 18 ++--
 .../monitoring/mbeans/ResourceMonitor.java      | 86 +++++++++++---------
 .../monitoring/mbeans/ZkClientPathMonitor.java  | 58 +++++++------
 .../dynamicMBeans/DynamicMBeanProvider.java     | 17 +---
 .../TestClusterEventStatusMonitor.java          | 68 +++++++++++++++-
 8 files changed, 201 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
index e7f09ea..8c77466 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterEventMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
@@ -35,20 +38,16 @@ public class ClusterEventMonitor extends DynamicMBeanProvider {
     TotalProcessed
   }
 
-  private static final long RESET_INTERVAL = 1000 * 60 * 10; // 1 hour
   private static final String CLUSTEREVENT_DN_KEY = "ClusterEventStatus";
   private static final String EVENT_DN_KEY = "eventName";
   private static final String PHASE_DN_KEY = "phaseName";
 
   private final String _phaseName;
 
-  private SimpleDynamicMetric<Long> _totalDuration =
-      new SimpleDynamicMetric("TotalDurationCounter", 0l);
-  private SimpleDynamicMetric<Long> _maxDuration =
-      new SimpleDynamicMetric("MaxSingleDurationGauge", 0l);
-  private SimpleDynamicMetric<Long> _count = new SimpleDynamicMetric("EventCounter", 0l);
-  private HistogramDynamicMetric _duration = new HistogramDynamicMetric("DurationGauge",
-      _metricRegistry.histogram(getMetricName("DurationGauge")));
+  private SimpleDynamicMetric<Long> _totalDuration;
+  private SimpleDynamicMetric<Long> _maxDuration;
+  private SimpleDynamicMetric<Long> _count;
+  private HistogramDynamicMetric _duration;
 
   private long _lastResetTime;
   private ClusterStatusMonitor _clusterStatusMonitor;
@@ -56,13 +55,31 @@ public class ClusterEventMonitor extends DynamicMBeanProvider {
   public ClusterEventMonitor(ClusterStatusMonitor clusterStatusMonitor, String phaseName) {
     _phaseName = phaseName;
     _clusterStatusMonitor = clusterStatusMonitor;
+
+    _duration = new HistogramDynamicMetric("DurationGauge", new Histogram(
+        new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _count = new SimpleDynamicMetric("EventCounter", 0l);
+    _maxDuration = new SimpleDynamicMetric("MaxSingleDurationGauge", 0l);
+    _totalDuration = new SimpleDynamicMetric("TotalDurationCounter", 0l);
+  }
+
+  public ClusterEventMonitor(ClusterStatusMonitor clusterStatusMonitor, String phaseName,
+      int histogramTimeWindowMs) {
+    _phaseName = phaseName;
+    _clusterStatusMonitor = clusterStatusMonitor;
+
+    _duration = new HistogramDynamicMetric("DurationGauge", new Histogram(
+        new SlidingTimeWindowReservoir(histogramTimeWindowMs, TimeUnit.MILLISECONDS)));
+    _count = new SimpleDynamicMetric("EventCounter", 0l);
+    _maxDuration = new SimpleDynamicMetric("MaxSingleDurationGauge", 0l);
+    _totalDuration = new SimpleDynamicMetric("TotalDurationCounter", 0l);
   }
 
   public void reportDuration(long duration) {
     _totalDuration.updateValue(_totalDuration.getValue() + duration);
     _count.updateValue(_count.getValue() + 1);
     _duration.updateValue(duration);
-    if (_lastResetTime + RESET_INTERVAL <= System.currentTimeMillis() ||
+    if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis() ||
         duration > _maxDuration.getValue()) {
       _maxDuration.updateValue(duration);
       _lastResetTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index c644d10..61f4ce1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -69,7 +69,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>();
 
   // phaseName -> eventMonitor
-  private final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap =
+  protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap =
       new ConcurrentHashMap<>();
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
index 0dbafb4..0fc2001 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/HelixCallbackMonitor.java
@@ -19,6 +19,8 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.InstanceType;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
@@ -28,6 +30,7 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 import javax.management.JMException;
 import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.List;
 
 public class HelixCallbackMonitor extends DynamicMBeanProvider {
@@ -42,14 +45,11 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider {
   private final String _clusterName;
   private final String _instanceName;
 
-  private SimpleDynamicMetric<Long> _counter = new SimpleDynamicMetric("Counter", 0l);
-  private SimpleDynamicMetric<Long> _unbatchedCounter =
-      new SimpleDynamicMetric("UnbatchedCounter", 0l);
-  private SimpleDynamicMetric<Long> _totalLatencyCounter =
-      new SimpleDynamicMetric("LatencyCounter", 0l);
+  private SimpleDynamicMetric<Long> _counter;
+  private SimpleDynamicMetric<Long> _unbatchedCounter;
+  private SimpleDynamicMetric<Long> _totalLatencyCounter;
 
-  private HistogramDynamicMetric _latencyGauge = new HistogramDynamicMetric("LatencyGauge",
-      _metricRegistry.histogram(getMetricName("LatencyGauge")));
+  private HistogramDynamicMetric _latencyGauge;
 
   public HelixCallbackMonitor(InstanceType type, String clusterName, String instanceName,
       HelixConstants.ChangeType changeType) throws JMException {
@@ -62,6 +62,12 @@ public class HelixCallbackMonitor extends DynamicMBeanProvider {
     _sensorName = String
         .format("%s.%s.%s.%s", MonitorDomainNames.HelixCallback.name(), type.name(), clusterName,
             changeType.name());
+
+    _latencyGauge = new HistogramDynamicMetric("LatencyGauge", new Histogram(
+        new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _totalLatencyCounter = new SimpleDynamicMetric("LatencyCounter", 0l);
+    _unbatchedCounter = new SimpleDynamicMetric("UnbatchedCounter", 0l);
+    _counter = new SimpleDynamicMetric("Counter", 0l);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
index f1c1039..dac5826 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MessageLatencyMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
 import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
@@ -35,19 +38,20 @@ public class MessageLatencyMonitor extends DynamicMBeanProvider {
   private final String _domainName;
   private final String _participantName;
 
-  private SimpleDynamicMetric<Long> _totalMessageCount =
-      new SimpleDynamicMetric("TotalMessageCount", 0l);
-  private SimpleDynamicMetric<Long> _totalMessageLatency =
-      new SimpleDynamicMetric("TotalMessageLatency", 0l);
-  private HistogramDynamicMetric _messageLatencyGauge =
-      new HistogramDynamicMetric("MessageLatencyGauge",
-          _metricRegistry.histogram(getMetricName("MessageLatencyGauge")));
+  private SimpleDynamicMetric<Long> _totalMessageCount;
+  private SimpleDynamicMetric<Long> _totalMessageLatency;
+  private HistogramDynamicMetric _messageLatencyGauge;
 
   public MessageLatencyMonitor(String domainName, String participantName) throws JMException {
     _domainName = domainName;
     _participantName = participantName;
     _sensorName = String.format("%s.%s", ParticipantMessageMonitor.PARTICIPANT_STATUS_KEY,
         "MessageLatency");
+
+    _messageLatencyGauge = new HistogramDynamicMetric("MessagelatencyGauge", new Histogram(
+        new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _totalMessageLatency = new SimpleDynamicMetric("TotalMessageLatency", 0l);
+    _totalMessageCount = new SimpleDynamicMetric("TotalMessageCount", 0l);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 3318ddd..662f323 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -33,46 +36,27 @@ import javax.management.ObjectName;
 import java.util.*;
 
 public class ResourceMonitor extends DynamicMBeanProvider {
-  private static final long RESET_TIME_RANGE = 1000 * 60 * 60; // 1 hour
 
   // Gauges
-  private SimpleDynamicMetric<Integer> _numOfPartitions =
-      new SimpleDynamicMetric("PartitionGauge", 0);
-  private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView =
-      new SimpleDynamicMetric("ExternalViewPartitionGauge", 0);
-  private SimpleDynamicMetric<Integer> _numOfErrorPartitions =
-      new SimpleDynamicMetric("ErrorPartitionGauge", 0);
-  private SimpleDynamicMetric<Integer> _numNonTopStatePartitions =
-      new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0);
-  private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions =
-      new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
-  private SimpleDynamicMetric<Long> _numLessReplicaPartitions =
-      new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
-  private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions =
-      new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
-  private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions =
-      new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
-  private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions =
-      new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l);
-  private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions =
-      new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
-  private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff =
-      new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0);
+  private SimpleDynamicMetric<Integer> _numOfPartitions;
+  private SimpleDynamicMetric<Integer> _numOfPartitionsInExternalView;
+  private SimpleDynamicMetric<Integer> _numOfErrorPartitions;
+  private SimpleDynamicMetric<Integer> _numNonTopStatePartitions;
+  private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions;
+  private SimpleDynamicMetric<Long> _numLessReplicaPartitions;
+  private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions;
+  private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions;
+  private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions;
+  private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions;
+  private SimpleDynamicMetric<Integer> _externalViewIdealStateDiff;
 
   // Counters
-  private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter =
-      new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l);
-  private SimpleDynamicMetric<Long> _successTopStateHandoffCounter =
-      new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
-  private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter =
-      new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
-  private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration =
-      new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
-  private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge =
-      new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", _metricRegistry
-          .histogram(getMetricName("PartitionTopStateHandoffDurationGauge")));
-  private SimpleDynamicMetric<Long> _totalMessageReceived =
-      new SimpleDynamicMetric("TotalMessageReceived", 0l);
+  private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter;
+  private SimpleDynamicMetric<Long> _successTopStateHandoffCounter;
+  private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter;
+  private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration;
+  private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+  private SimpleDynamicMetric<Long> _totalMessageReceived;
 
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
@@ -114,6 +98,34 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _clusterName = clusterName;
     _resourceName = resourceName;
     _initObjectName = objectName;
+
+    _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0l);
+    _numLoadRebalanceThrottledPartitions =
+        new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
+    _numRecoveryRebalanceThrottledPartitions =
+        new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l);
+    _numPendingLoadRebalancePartitions =
+        new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
+    _numPendingRecoveryRebalancePartitions =
+        new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
+    _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
+    _numLessMinActiveReplicaPartitions =
+        new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
+    _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0l);
+    _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0l);
+    _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0l);
+    _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0l);
+
+    _partitionTopStateHandoffDurationGauge =
+        new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
+            new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0l);
+    _maxSinglePartitionTopStateHandoffDuration =
+        new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
+    _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
+    _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
+    _successfulTopStateHandoffDurationCounter =
+        new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l);
   }
 
   @Override
@@ -341,7 +353,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   }
 
   public void resetMaxTopStateHandoffGauge() {
-    if (_lastResetTime + RESET_TIME_RANGE <= System.currentTimeMillis()) {
+    if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
       _maxSinglePartitionTopStateHandoffDuration.updateValue(0l);
       _lastResetTime = System.currentTimeMillis();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index e87738b..bc6a36b 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -19,6 +19,9 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowReservoir;
+import java.util.concurrent.TimeUnit;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
@@ -62,30 +65,19 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     }
   }
 
-  private SimpleDynamicMetric<Long> _readCounter = new SimpleDynamicMetric("ReadCounter", 0l);
-  private SimpleDynamicMetric<Long> _writeCounter = new SimpleDynamicMetric("WriteCounter", 0l);
-  private SimpleDynamicMetric<Long> _readBytesCounter =
-      new SimpleDynamicMetric("ReadBytesCounter", 0l);
-  private SimpleDynamicMetric<Long> _writeBytesCounter =
-      new SimpleDynamicMetric("WriteBytesCounter", 0l);
-  private SimpleDynamicMetric<Long> _readFailureCounter =
-      new SimpleDynamicMetric("ReadFailureCounter", 0l);
-  private SimpleDynamicMetric<Long> _writeFailureCounter =
-      new SimpleDynamicMetric("WriteFailureCounter", 0l);
-  private SimpleDynamicMetric<Long> _readTotalLatencyCounter =
-      new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l);
-  private SimpleDynamicMetric<Long> _writeTotalLatencyCounter =
-      new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l);
-
-  private HistogramDynamicMetric _readLatencyGauge = new HistogramDynamicMetric("ReadLatencyGauge",
-      _metricRegistry.histogram(getMetricName("ReadLatencyGauge")));
-  private HistogramDynamicMetric _writeLatencyGauge =
-      new HistogramDynamicMetric("WriteLatencyGauge",
-          _metricRegistry.histogram(getMetricName("WriteLatencyGauge")));
-  private HistogramDynamicMetric _readBytesGauge = new HistogramDynamicMetric("ReadBytesGauge",
-      _metricRegistry.histogram(getMetricName("ReadBytesGauge")));
-  private HistogramDynamicMetric _writeBytesGauge = new HistogramDynamicMetric("WriteBytesGauge",
-      _metricRegistry.histogram(getMetricName("WriteBytesGauge")));
+  private SimpleDynamicMetric<Long> _readCounter;
+  private SimpleDynamicMetric<Long> _writeCounter;
+  private SimpleDynamicMetric<Long> _readBytesCounter;
+  private SimpleDynamicMetric<Long> _writeBytesCounter;
+  private SimpleDynamicMetric<Long> _readFailureCounter;
+  private SimpleDynamicMetric<Long> _writeFailureCounter;
+  private SimpleDynamicMetric<Long> _readTotalLatencyCounter;
+  private SimpleDynamicMetric<Long> _writeTotalLatencyCounter;
+
+  private HistogramDynamicMetric _readLatencyGauge;
+  private HistogramDynamicMetric _writeLatencyGauge;
+  private HistogramDynamicMetric _readBytesGauge;
+  private HistogramDynamicMetric _writeBytesGauge;
 
   @Override
   public String getSensorName() {
@@ -101,6 +93,24 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     _sensorName = String
         .format("%s.%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey,
             path.name());
+
+    _writeTotalLatencyCounter = new SimpleDynamicMetric("WriteTotalLatencyCounter", 0l);
+    _readTotalLatencyCounter = new SimpleDynamicMetric("ReadTotalLatencyCounter", 0l);
+    _writeFailureCounter = new SimpleDynamicMetric("WriteFailureCounter", 0l);
+    _readFailureCounter = new SimpleDynamicMetric("ReadFailureCounter", 0l);
+    _writeBytesCounter = new SimpleDynamicMetric("WriteBytesCounter", 0l);
+    _readBytesCounter = new SimpleDynamicMetric("ReadBytesCounter", 0l);
+    _writeCounter = new SimpleDynamicMetric("WriteCounter", 0l);
+    _readCounter = new SimpleDynamicMetric("ReadCounter", 0l);
+
+    _readLatencyGauge = new HistogramDynamicMetric("ReadLatencyGauge", new Histogram(
+        new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _writeLatencyGauge = new HistogramDynamicMetric("WriteLatencyGauge", new Histogram(
+        new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _readBytesGauge = new HistogramDynamicMetric("ReadBytesGauge", new Histogram(
+        new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _writeBytesGauge = new HistogramDynamicMetric("WriteBytesGauge", new Histogram(
+        new SlidingTimeWindowReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
   }
 
   public ZkClientPathMonitor register() throws JMException {

http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index cc97b3b..b44c63c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -19,9 +19,6 @@ package org.apache.helix.monitoring.mbeans.dynamicMBeans;
  * under the License.
  */
 
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
 import org.apache.helix.HelixException;
 import org.apache.helix.monitoring.SensorNameProvider;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
@@ -36,7 +33,7 @@ import java.util.*;
  */
 public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNameProvider {
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
-  protected static final MetricRegistry _metricRegistry = new MetricRegistry();
+  protected static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // Reset time every hour
   private static String SENSOR_NAME_TAG = "SensorName";
   private static String DEFAULT_DESCRIPTION =
       "Information on the management interface of the MBean";
@@ -86,11 +83,6 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
     doRegister(dynamicMetrics, null, objectName);
   }
 
-  protected String getMetricName(String metricName) {
-    return MetricRegistry
-        .name(getClass().getSimpleName(), Integer.toHexString(hashCode()), metricName);
-  }
-
   /**
    * Update the Dynamic MBean provider with new metric list.
    *
@@ -145,13 +137,6 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
    * After unregistered, the MBean can't be registered again, a new monitor has be to created.
    */
   public synchronized void unregister() {
-    final String metricNamePrefix = getMetricName(null);
-    _metricRegistry.removeMatching(new MetricFilter() {
-      @Override
-      public boolean matches(String name, Metric metric) {
-        return name.startsWith(metricNamePrefix);
-      }
-    });
     MBeanRegistrar.unregister(_objectName);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/3a73b0f3/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
index eb4f94b..b607add 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
@@ -22,8 +22,10 @@ package org.apache.helix.monitoring;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.management.AttributeNotFoundException;
 import javax.management.InstanceNotFoundException;
+import javax.management.JMException;
 import javax.management.MBeanException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -40,21 +42,43 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestClusterEventStatusMonitor {
+  private static final int TEST_SLIDING_WINDOW_MS = 2000; // 2s window for testing
+
+  private class ClusterStatusMonitorForTest extends ClusterStatusMonitor {
+    public ClusterStatusMonitorForTest(String clusterName) {
+      super(clusterName);
+    }
+    public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMBean() {
+      return _clusterEventMbeanMap;
+    }
+  }
 
   @Test()
   public void test()
       throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException,
       IOException, InterruptedException, MBeanException, AttributeNotFoundException,
-      ReflectionException {
+      ReflectionException{
     System.out.println("START TestClusterEventStatusMonitor");
     String clusterName = "TestCluster";
-    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    ClusterStatusMonitorForTest monitor = new ClusterStatusMonitorForTest(clusterName);
 
     MBeanServer _server = ManagementFactory.getPlatformMBeanServer();
     Set<ObjectInstance> mbeans =
         _server.queryMBeans(new ObjectName("ClusterStatus:Cluster=TestCluster,eventName=ClusterEvent,*"), null);
     Assert.assertEquals(mbeans.size(), 0);
 
+    // Customize event monitors for testing
+    try {
+      this.addTestEventMonitor(monitor, ClusterEventMonitor.PhaseName.Callback.name());
+      this.addTestEventMonitor(monitor, ClusterEventMonitor.PhaseName.InQueue.name());
+      this.addTestEventMonitor(monitor, BestPossibleStateCalcStage.class.getSimpleName());
+      this.addTestEventMonitor(monitor, ReadClusterDataStage.class.getSimpleName());
+      this.addTestEventMonitor(monitor, IntermediateStateCalcStage.class.getSimpleName());
+      this.addTestEventMonitor(monitor, TaskAssignmentStage.class.getSimpleName());
+    } catch (JMException jme) {
+      Assert.assertTrue(false, "Failed to customize event monitors");
+    }
+
     int count = 5;
     Long totalDuration = 0L;
     for (int i = 1; i <= count; i++) {
@@ -77,9 +101,39 @@ public class TestClusterEventStatusMonitor {
       Long maxDuration = (Long) _server.getAttribute(mbean.getObjectName(), "MaxSingleDurationGauge");
       Long eventCount = (Long) _server.getAttribute(mbean.getObjectName(), "EventCounter");
 
+      Double pct75th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct75th");
+      Double pct95th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct95th");
+      Double pct99th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct99th");
+      Long max = (Long) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Max");
+      Double stddev = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.StdDev");
+
       Assert.assertEquals(duration, totalDuration);
       Assert.assertEquals(maxDuration, Long.valueOf(100 * count));
       Assert.assertEquals(eventCount, Long.valueOf(count));
+      Assert.assertTrue(Math.abs(pct75th - 450.0) < 1);
+      Assert.assertTrue(Math.abs(pct95th - 500.0) < 1);
+      Assert.assertTrue(Math.abs(pct99th - 500.0) < 1);
+      Assert.assertTrue(max == 500);
+      Assert.assertTrue(Math.abs(stddev - 158.0) < 0.2);
+    }
+
+    System.out.println("\nWaiting for time window to expire\n");
+    Thread.sleep(TEST_SLIDING_WINDOW_MS);
+
+    // Since sliding window has expired, just make sure histograms have its values reset
+    for (ObjectInstance mbean : mbeans) {
+      Double pct75th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct75th");
+      Double pct95th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct95th");
+      Double pct99th = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Pct99th");
+      Long max = (Long) _server.getAttribute(mbean.getObjectName(), "DurationGauge.Max");
+      Double stddev = (Double) _server.getAttribute(mbean.getObjectName(), "DurationGauge.StdDev");
+
+      Assert.assertTrue(pct75th == 0.0);
+      Assert.assertTrue(pct95th == 0.0);
+      Assert.assertTrue(pct99th == 0.0);
+      Assert.assertTrue(max == 0);
+      Assert.assertTrue(stddev == 0.0);
+
     }
 
     monitor.reset();
@@ -91,4 +145,14 @@ public class TestClusterEventStatusMonitor {
 
     System.out.println("END TestParticipantMonitor");
   }
+
+  private void addTestEventMonitor(ClusterStatusMonitorForTest monitor, String phaseName) throws
+      JMException {
+    ConcurrentHashMap<String, ClusterEventMonitor> mbean = monitor.getClusterEventMBean();
+    ClusterEventMonitor eventMonitor = new ClusterEventMonitor(monitor, phaseName,
+        TEST_SLIDING_WINDOW_MS);
+    eventMonitor.register();
+    mbean.put(phaseName, eventMonitor);
+  }
+
 }