You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 20:52:42 UTC

helix git commit: [HELIX-714] [HaaS] Fix aggregate metrics in ClusterStatusMonitor

Repository: helix
Updated Branches:
  refs/heads/master e1ca65193 -> 8a6ac8ff2


[HELIX-714] [HaaS] Fix aggregate metrics in ClusterStatusMonitor

Names of the metrics have been fixed per Helix's convention and loops are now used instead of using delta values.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8a6ac8ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8a6ac8ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8a6ac8ff

Branch: refs/heads/master
Commit: 8a6ac8ff278aa9b4ad8445700266af820d0d62cc
Parents: e1ca651
Author: Hunter Lee <na...@gmail.com>
Authored: Mon Jul 9 12:24:46 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Mon Jul 9 13:52:19 2018 -0700

----------------------------------------------------------------------
 .../monitoring/mbeans/ClusterStatusMonitor.java |  59 +++++------
 .../mbeans/ClusterStatusMonitorMBean.java       |   8 +-
 .../monitoring/mbeans/ResourceMonitor.java      | 101 ++++++-------------
 .../mbeans/TestClusterAggregateMetrics.java     |  16 ++-
 4 files changed, 69 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 954ae7d..b2c5fb0 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -68,12 +68,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
 
-  // Aggregate metrics from ResourceMonitors
-  private volatile long _totalPartitionCount = 0;
-  private volatile long _totalErrorPartitionCount = 0;
-  private volatile long _totalPartitionsWithoutTopStateCount = 0;
-  private volatile long _totalExternalViewIdealStateMismatchPartitionCount = 0;
-
   private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>();
 
@@ -460,7 +454,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
-                new ResourceMonitor(this, _clusterName, resourceName, getObjectName(beanName));
+                new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName));
             bean.register();
             _resourceMbeanMap.put(resourceName, bean);
           }
@@ -803,39 +797,38 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   @Override
-  public long getTotalPartitionCount() {
-    return _totalPartitionCount;
+  public long getTotalPartitionGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getPartitionGauge();
+    }
+    return total;
   }
 
   @Override
-  public long getTotalErrorPartitionCount() {
-    return _totalErrorPartitionCount;
+  public long getErrorPartitionGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getErrorPartitionGauge();
+    }
+    return total;
   }
 
   @Override
-  public long getTotalPartitionsWithoutTopStateCount() {
-    return _totalPartitionsWithoutTopStateCount;
+  public long getMissingTopStatePartitionGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getMissingTopStatePartitionGauge();
+    }
+    return total;
   }
 
   @Override
-  public long getTotalExternalViewIdealStateMismatchPartitionCount() {
-    return _totalExternalViewIdealStateMismatchPartitionCount;
-  }
-
-  synchronized void applyDeltaToTotalPartitionCount(long delta) {
-    _totalPartitionCount += delta;
-  }
-
-  synchronized void applyDeltaToTotalErrorPartitionCount(long delta) {
-    _totalErrorPartitionCount += delta;
-  }
-
-  synchronized void applyDeltaToTotalPartitionsWithoutTopStateCount(long delta) {
-    _totalPartitionsWithoutTopStateCount += delta;
-  }
-
-  synchronized void applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(long delta) {
-    _totalExternalViewIdealStateMismatchPartitionCount += delta;
+  public long getDifferenceWithIdealStateGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getDifferenceWithIdealStateGauge();
+    }
+    return total;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 6ace495..81600cb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -84,20 +84,20 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
   /**
    * @return number of all partitions in this cluster
    */
-  long getTotalPartitionCount();
+  long getTotalPartitionGauge();
 
   /**
    * @return number of all partitions in this cluster that have errors
    */
-  long getTotalErrorPartitionCount();
+  long getErrorPartitionGauge();
 
   /**
    * @return number of all partitions in this cluster without any top-state replicas
    */
-  long getTotalPartitionsWithoutTopStateCount();
+  long getMissingTopStatePartitionGauge();
 
   /**
    * @return number of all partitions in this cluster whose ExternalView and IdealState have discrepancies
    */
-  long getTotalExternalViewIdealStateMismatchPartitionCount();
+  long getDifferenceWithIdealStateGauge();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 125257b..d4b46b1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -55,15 +55,16 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _successTopStateHandoffCounter;
   private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter;
   private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration;
-  private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
   private SimpleDynamicMetric<Long> _totalMessageReceived;
 
+  // Histograms
+  private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
   private final String _resourceName;
   private final String _clusterName;
   private final ObjectName _initObjectName;
-  private ClusterStatusMonitor _clusterStatusMonitor;
 
   @Override
   public ResourceMonitor register() throws JMException {
@@ -89,70 +90,48 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return this;
   }
 
-  @Override
-  public synchronized void unregister() {
-    super.unregister();
-    // Also remove metrics propagated to aggregate metrics in ClusterStatusMonitor
-    if (_clusterStatusMonitor != null) {
-      _clusterStatusMonitor.applyDeltaToTotalPartitionCount(-_numOfPartitions.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(-_numOfErrorPartitions.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
-          -_externalViewIdealStateDiff.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(-_numNonTopStatePartitions.getValue());
-    }
-  }
-
   public enum MonitorState {
     TOP_STATE
   }
 
   public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
-    this(null, clusterName, resourceName, objectName);
-  }
-
-  public ResourceMonitor(ClusterStatusMonitor clusterStatusMonitor, String clusterName, String resourceName,
-      ObjectName objectName) {
-    if (clusterStatusMonitor == null) {
-      _logger.warn("ResourceMonitor initialized without a reference to ClusterStatusMonitor (null): metrics will not "
-          + "be aggregated at the cluster level.");
-    }
-    _clusterStatusMonitor = clusterStatusMonitor;
     _clusterName = clusterName;
     _resourceName = resourceName;
     _initObjectName = objectName;
 
-    _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0l);
+    _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
     _numLoadRebalanceThrottledPartitions =
-        new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
+        new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0L);
     _numRecoveryRebalanceThrottledPartitions =
-        new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l);
+        new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0L);
     _numPendingLoadRebalancePartitions =
-        new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
+        new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0L);
     _numPendingRecoveryRebalancePartitions =
-        new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
-    _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
+        new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0L);
+    _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0L);
     _numLessMinActiveReplicaPartitions =
-        new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
-    _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0l);
-    _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0l);
-    _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0l);
-    _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0l);
+        new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0L);
+    _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0L);
+    _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0L);
+    _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0L);
+    _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L);
 
     _partitionTopStateHandoffDurationGauge =
         new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
             new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
-    _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0l);
+    _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0L);
     _maxSinglePartitionTopStateHandoffDuration =
-        new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
-    _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
-    _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
+        new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L);
+    _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0L);
+    _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L);
     _successfulTopStateHandoffDurationCounter =
-        new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l);
+        new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0L);
   }
 
   @Override
   public String getSensorName() {
-    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, _tag, _resourceName);
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+        _tag, _resourceName);
   }
 
   public long getPartitionGauge() {
@@ -293,17 +272,6 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       }
     }
 
-    // Update cluster-level aggregate metrics in ClusterStatusMonitor
-    if (_clusterStatusMonitor != null) {
-      _clusterStatusMonitor.applyDeltaToTotalPartitionCount(partitions.size() - _numOfPartitions.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(
-          numOfErrorPartitions - _numOfErrorPartitions.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
-          numOfDiff - _externalViewIdealStateDiff.getValue());
-      _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(
-          (partitions.size() - numOfPartitionWithTopState) - _numNonTopStatePartitions.getValue());
-    }
-
     // Update resource-level metrics
     _numOfPartitions.updateValue((long) partitions.size());
     _numOfErrorPartitions.updateValue(numOfErrorPartitions);
@@ -320,21 +288,18 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   }
 
   private void resetGauges() {
-    // Disable reset for the following gauges:
-    // 1) Need the previous values for these gauges to compute delta for cluster-level metrics.
-    // 2) These four gauges are reset every time updateResource is called anyway.
-    //_numOfErrorPartitions.updateValue(0l);
-    //_numNonTopStatePartitions.updateValue(0l);
-    //_externalViewIdealStateDiff.updateValue(0l);
-    //_numOfPartitionsInExternalView.updateValue(0l);
+    _numOfErrorPartitions.updateValue(0L);
+    _numNonTopStatePartitions.updateValue(0L);
+    _externalViewIdealStateDiff.updateValue(0L);
+    _numOfPartitionsInExternalView.updateValue(0L);
 
     // The following gauges are computed each call to updateResource by way of looping so need to be reset.
-    _numLessMinActiveReplicaPartitions.updateValue(0l);
-    _numLessReplicaPartitions.updateValue(0l);
-    _numPendingRecoveryRebalancePartitions.updateValue(0l);
-    _numPendingLoadRebalancePartitions.updateValue(0l);
-    _numRecoveryRebalanceThrottledPartitions.updateValue(0l);
-    _numLoadRebalanceThrottledPartitions.updateValue(0l);
+    _numLessMinActiveReplicaPartitions.updateValue(0L);
+    _numLessReplicaPartitions.updateValue(0L);
+    _numPendingRecoveryRebalancePartitions.updateValue(0L);
+    _numPendingLoadRebalancePartitions.updateValue(0L);
+    _numRecoveryRebalanceThrottledPartitions.updateValue(0L);
+    _numLoadRebalanceThrottledPartitions.updateValue(0L);
   }
 
   public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) {
@@ -398,8 +363,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
-      _maxSinglePartitionTopStateHandoffDuration.updateValue(0l);
+      _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);
       _lastResetTime = System.currentTimeMillis();
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
index ba3b654..dfe5016 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
@@ -61,10 +61,10 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase {
   private static final int NUM_PARTITIONS = 5;
   private static final int NUM_REPLICAS = 3;
 
-  private static final String PARTITION_COUNT = "TotalPartitionCount";
-  private static final String ERROR_PARTITION_COUNT = "TotalErrorPartitionCount";
-  private static final String WITHOUT_TOPSTATE_COUNT = "TotalPartitionsWithoutTopStateCount";
-  private static final String IS_EV_MISMATCH_COUNT = "TotalExternalViewIdealStateMismatchPartitionCount";
+  private static final String PARTITION_COUNT = "TotalPartitionGauge";
+  private static final String ERROR_PARTITION_COUNT = "ErrorPartitionGauge";
+  private static final String WITHOUT_TOPSTATE_COUNT = "MissingTopStatePartitionGauge";
+  private static final String IS_EV_MISMATCH_COUNT = "DifferenceWithIdealStateGauge";
 
   private static final int START_PORT = 12918;
   private static final String STATE_MODEL = "MasterSlave";
@@ -82,15 +82,11 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase {
   public void beforeClass() throws Exception {
     System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
 
-    String namespace = "/" + CLUSTER_NAME;
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
     _setupTool = new ClusterSetup(ZK_ADDR);
-
     // setup storage cluster
     _setupTool.addCluster(CLUSTER_NAME, true);
     _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, NUM_PARTITIONS, STATE_MODEL);
+
     for (int i = 0; i < NUM_PARTICIPANTS; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
@@ -145,7 +141,7 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase {
   }
 
   @Test
-  public void testAggregateMetrics() throws InterruptedException {
+  public void testAggregateMetrics() throws Exception {
     // Everything should be up and running initially with 5 total partitions
     updateMetrics();
     Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L);