You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 20:52:42 UTC
helix git commit: [HELIX-714] [HaaS] Fix aggregate metrics in
ClusterStatusMonitor
Repository: helix
Updated Branches:
refs/heads/master e1ca65193 -> 8a6ac8ff2
[HELIX-714] [HaaS] Fix aggregate metrics in ClusterStatusMonitor
Names of the metrics have been fixed per Helix's convention and loops are now used instead of using delta values.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8a6ac8ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8a6ac8ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8a6ac8ff
Branch: refs/heads/master
Commit: 8a6ac8ff278aa9b4ad8445700266af820d0d62cc
Parents: e1ca651
Author: Hunter Lee <na...@gmail.com>
Authored: Mon Jul 9 12:24:46 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Mon Jul 9 13:52:19 2018 -0700
----------------------------------------------------------------------
.../monitoring/mbeans/ClusterStatusMonitor.java | 59 +++++------
.../mbeans/ClusterStatusMonitorMBean.java | 8 +-
.../monitoring/mbeans/ResourceMonitor.java | 101 ++++++-------------
.../mbeans/TestClusterAggregateMetrics.java | 16 ++-
4 files changed, 69 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 954ae7d..b2c5fb0 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -68,12 +68,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private boolean _rebalanceFailure = false;
private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
- // Aggregate metrics from ResourceMonitors
- private volatile long _totalPartitionCount = 0;
- private volatile long _totalErrorPartitionCount = 0;
- private volatile long _totalPartitionsWithoutTopStateCount = 0;
- private volatile long _totalExternalViewIdealStateMismatchPartitionCount = 0;
-
private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>();
@@ -460,7 +454,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
if (!_resourceMbeanMap.containsKey(resourceName)) {
String beanName = getResourceBeanName(resourceName);
ResourceMonitor bean =
- new ResourceMonitor(this, _clusterName, resourceName, getObjectName(beanName));
+ new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName));
bean.register();
_resourceMbeanMap.put(resourceName, bean);
}
@@ -803,39 +797,38 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
@Override
- public long getTotalPartitionCount() {
- return _totalPartitionCount;
+ public long getTotalPartitionGauge() {
+ long total = 0;
+ for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+ total += entry.getValue().getPartitionGauge();
+ }
+ return total;
}
@Override
- public long getTotalErrorPartitionCount() {
- return _totalErrorPartitionCount;
+ public long getErrorPartitionGauge() {
+ long total = 0;
+ for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+ total += entry.getValue().getErrorPartitionGauge();
+ }
+ return total;
}
@Override
- public long getTotalPartitionsWithoutTopStateCount() {
- return _totalPartitionsWithoutTopStateCount;
+ public long getMissingTopStatePartitionGauge() {
+ long total = 0;
+ for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+ total += entry.getValue().getMissingTopStatePartitionGauge();
+ }
+ return total;
}
@Override
- public long getTotalExternalViewIdealStateMismatchPartitionCount() {
- return _totalExternalViewIdealStateMismatchPartitionCount;
- }
-
- synchronized void applyDeltaToTotalPartitionCount(long delta) {
- _totalPartitionCount += delta;
- }
-
- synchronized void applyDeltaToTotalErrorPartitionCount(long delta) {
- _totalErrorPartitionCount += delta;
- }
-
- synchronized void applyDeltaToTotalPartitionsWithoutTopStateCount(long delta) {
- _totalPartitionsWithoutTopStateCount += delta;
- }
-
- synchronized void applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(long delta) {
- _totalExternalViewIdealStateMismatchPartitionCount += delta;
+ public long getDifferenceWithIdealStateGauge() {
+ long total = 0;
+ for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+ total += entry.getValue().getDifferenceWithIdealStateGauge();
+ }
+ return total;
}
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 6ace495..81600cb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -84,20 +84,20 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
/**
* @return number of all partitions in this cluster
*/
- long getTotalPartitionCount();
+ long getTotalPartitionGauge();
/**
* @return number of all partitions in this cluster that have errors
*/
- long getTotalErrorPartitionCount();
+ long getErrorPartitionGauge();
/**
* @return number of all partitions in this cluster without any top-state replicas
*/
- long getTotalPartitionsWithoutTopStateCount();
+ long getMissingTopStatePartitionGauge();
/**
* @return number of all partitions in this cluster whose ExternalView and IdealState have discrepancies
*/
- long getTotalExternalViewIdealStateMismatchPartitionCount();
+ long getDifferenceWithIdealStateGauge();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 125257b..d4b46b1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -55,15 +55,16 @@ public class ResourceMonitor extends DynamicMBeanProvider {
private SimpleDynamicMetric<Long> _successTopStateHandoffCounter;
private SimpleDynamicMetric<Long> _failedTopStateHandoffCounter;
private SimpleDynamicMetric<Long> _maxSinglePartitionTopStateHandoffDuration;
- private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
private SimpleDynamicMetric<Long> _totalMessageReceived;
+ // Histograms
+ private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+
private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private long _lastResetTime;
private final String _resourceName;
private final String _clusterName;
private final ObjectName _initObjectName;
- private ClusterStatusMonitor _clusterStatusMonitor;
@Override
public ResourceMonitor register() throws JMException {
@@ -89,70 +90,48 @@ public class ResourceMonitor extends DynamicMBeanProvider {
return this;
}
- @Override
- public synchronized void unregister() {
- super.unregister();
- // Also remove metrics propagated to aggregate metrics in ClusterStatusMonitor
- if (_clusterStatusMonitor != null) {
- _clusterStatusMonitor.applyDeltaToTotalPartitionCount(-_numOfPartitions.getValue());
- _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(-_numOfErrorPartitions.getValue());
- _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
- -_externalViewIdealStateDiff.getValue());
- _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(-_numNonTopStatePartitions.getValue());
- }
- }
-
public enum MonitorState {
TOP_STATE
}
public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
- this(null, clusterName, resourceName, objectName);
- }
-
- public ResourceMonitor(ClusterStatusMonitor clusterStatusMonitor, String clusterName, String resourceName,
- ObjectName objectName) {
- if (clusterStatusMonitor == null) {
- _logger.warn("ResourceMonitor initialized without a reference to ClusterStatusMonitor (null): metrics will not "
- + "be aggregated at the cluster level.");
- }
- _clusterStatusMonitor = clusterStatusMonitor;
_clusterName = clusterName;
_resourceName = resourceName;
_initObjectName = objectName;
- _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0l);
+ _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
_numLoadRebalanceThrottledPartitions =
- new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0l);
+ new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0L);
_numRecoveryRebalanceThrottledPartitions =
- new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0l);
+ new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0L);
_numPendingLoadRebalancePartitions =
- new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0l);
+ new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0L);
_numPendingRecoveryRebalancePartitions =
- new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0l);
- _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0l);
+ new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0L);
+ _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0L);
_numLessMinActiveReplicaPartitions =
- new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0l);
- _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0l);
- _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0l);
- _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0l);
- _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0l);
+ new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0L);
+ _numNonTopStatePartitions = new SimpleDynamicMetric("MissingTopStatePartitionGauge", 0L);
+ _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0L);
+ _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0L);
+ _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L);
_partitionTopStateHandoffDurationGauge =
new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
- _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0l);
+ _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0L);
_maxSinglePartitionTopStateHandoffDuration =
- new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0l);
- _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0l);
- _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0l);
+ new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L);
+ _failedTopStateHandoffCounter = new SimpleDynamicMetric("FailedTopStateHandoffCounter", 0L);
+ _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L);
_successfulTopStateHandoffDurationCounter =
- new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0l);
+ new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0L);
}
@Override
public String getSensorName() {
- return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName, _tag, _resourceName);
+ return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+ _tag, _resourceName);
}
public long getPartitionGauge() {
@@ -293,17 +272,6 @@ public class ResourceMonitor extends DynamicMBeanProvider {
}
}
- // Update cluster-level aggregate metrics in ClusterStatusMonitor
- if (_clusterStatusMonitor != null) {
- _clusterStatusMonitor.applyDeltaToTotalPartitionCount(partitions.size() - _numOfPartitions.getValue());
- _clusterStatusMonitor.applyDeltaToTotalErrorPartitionCount(
- numOfErrorPartitions - _numOfErrorPartitions.getValue());
- _clusterStatusMonitor.applyDeltaToTotalExternalViewIdealStateMismatchPartitionCount(
- numOfDiff - _externalViewIdealStateDiff.getValue());
- _clusterStatusMonitor.applyDeltaToTotalPartitionsWithoutTopStateCount(
- (partitions.size() - numOfPartitionWithTopState) - _numNonTopStatePartitions.getValue());
- }
-
// Update resource-level metrics
_numOfPartitions.updateValue((long) partitions.size());
_numOfErrorPartitions.updateValue(numOfErrorPartitions);
@@ -320,21 +288,18 @@ public class ResourceMonitor extends DynamicMBeanProvider {
}
private void resetGauges() {
- // Disable reset for the following gauges:
- // 1) Need the previous values for these gauges to compute delta for cluster-level metrics.
- // 2) These four gauges are reset every time updateResource is called anyway.
- //_numOfErrorPartitions.updateValue(0l);
- //_numNonTopStatePartitions.updateValue(0l);
- //_externalViewIdealStateDiff.updateValue(0l);
- //_numOfPartitionsInExternalView.updateValue(0l);
+ _numOfErrorPartitions.updateValue(0L);
+ _numNonTopStatePartitions.updateValue(0L);
+ _externalViewIdealStateDiff.updateValue(0L);
+ _numOfPartitionsInExternalView.updateValue(0L);
// The following gauges are computed each call to updateResource by way of looping so need to be reset.
- _numLessMinActiveReplicaPartitions.updateValue(0l);
- _numLessReplicaPartitions.updateValue(0l);
- _numPendingRecoveryRebalancePartitions.updateValue(0l);
- _numPendingLoadRebalancePartitions.updateValue(0l);
- _numRecoveryRebalanceThrottledPartitions.updateValue(0l);
- _numLoadRebalanceThrottledPartitions.updateValue(0l);
+ _numLessMinActiveReplicaPartitions.updateValue(0L);
+ _numLessReplicaPartitions.updateValue(0L);
+ _numPendingRecoveryRebalancePartitions.updateValue(0L);
+ _numPendingLoadRebalancePartitions.updateValue(0L);
+ _numRecoveryRebalanceThrottledPartitions.updateValue(0L);
+ _numLoadRebalanceThrottledPartitions.updateValue(0L);
}
public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) {
@@ -398,8 +363,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
public void resetMaxTopStateHandoffGauge() {
if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
- _maxSinglePartitionTopStateHandoffDuration.updateValue(0l);
+ _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);
_lastResetTime = System.currentTimeMillis();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/8a6ac8ff/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
index ba3b654..dfe5016 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterAggregateMetrics.java
@@ -61,10 +61,10 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase {
private static final int NUM_PARTITIONS = 5;
private static final int NUM_REPLICAS = 3;
- private static final String PARTITION_COUNT = "TotalPartitionCount";
- private static final String ERROR_PARTITION_COUNT = "TotalErrorPartitionCount";
- private static final String WITHOUT_TOPSTATE_COUNT = "TotalPartitionsWithoutTopStateCount";
- private static final String IS_EV_MISMATCH_COUNT = "TotalExternalViewIdealStateMismatchPartitionCount";
+ private static final String PARTITION_COUNT = "TotalPartitionGauge";
+ private static final String ERROR_PARTITION_COUNT = "ErrorPartitionGauge";
+ private static final String WITHOUT_TOPSTATE_COUNT = "MissingTopStatePartitionGauge";
+ private static final String IS_EV_MISMATCH_COUNT = "DifferenceWithIdealStateGauge";
private static final int START_PORT = 12918;
private static final String STATE_MODEL = "MasterSlave";
@@ -82,15 +82,11 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase {
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
- String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace)) {
- _gZkClient.deleteRecursively(namespace);
- }
_setupTool = new ClusterSetup(ZK_ADDR);
-
// setup storage cluster
_setupTool.addCluster(CLUSTER_NAME, true);
_setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, NUM_PARTITIONS, STATE_MODEL);
+
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
@@ -145,7 +141,7 @@ public class TestClusterAggregateMetrics extends ZkIntegrationTestBase {
}
@Test
- public void testAggregateMetrics() throws InterruptedException {
+ public void testAggregateMetrics() throws Exception {
// Everything should be up and running initially with 5 total partitions
updateMetrics();
Assert.assertEquals(_beanValueMap.get(PARTITION_COUNT), 5L);