You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/20 18:38:06 UTC
[helix] branch master updated: Add metrics for rebalance throttled because of error partition
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new aa5d1477d Add metrics for rebalance throttled because of error partition
aa5d1477d is described below
commit aa5d1477def24ebe9c8a3c4a8ea2f502d64551f5
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jan 20 10:38:01 2023 -0800
Add metrics for rebalance throttled because of error partition
This change adds metrics for rebalance throttled because of error partition. It will report a gauge value on resource and cluster granularity in Intermediate stage.
---
.../stages/IntermediateStateCalcStage.java | 5 +-
.../monitoring/mbeans/ClusterStatusMonitor.java | 16 ++-
.../mbeans/ClusterStatusMonitorMBean.java | 6 +
.../helix/monitoring/mbeans/ResourceMonitor.java | 14 ++-
.../stages/TestIntermediateStateCalcStage.java | 134 +++++++++++++++++++++
5 files changed, 165 insertions(+), 10 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index a7622b146..835b19be5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -28,7 +28,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -52,7 +51,6 @@ import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.monitoring.mbeans.ResourceMonitor;
-import org.apache.helix.participant.statemachine.StateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -421,7 +419,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
if (clusterStatusMonitor != null) {
clusterStatusMonitor
.updateRebalancerStats(resourceName, messagesForRecovery.size(), messagesForLoad.size(),
- messagesThrottledForRecovery.size(), messagesThrottledForLoad.size());
+ messagesThrottledForRecovery.size(), messagesThrottledForLoad.size(),
+ onlyDownwardLoadBalance);
}
if (logger.isDebugEnabled()) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 0207753b8..bf0315d3a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -585,13 +585,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
public void updateRebalancerStats(String resourceName, long numPendingRecoveryRebalancePartitions,
long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions,
- long numLoadRebalanceThrottledPartitions) {
+ long numLoadRebalanceThrottledPartitions, boolean rebalanceThrottledByErrorPartitions) {
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
if (resourceMonitor != null) {
resourceMonitor.updateRebalancerStats(numPendingRecoveryRebalancePartitions,
numPendingLoadRebalancePartitions, numRecoveryRebalanceThrottledPartitions,
- numLoadRebalanceThrottledPartitions);
+ numLoadRebalanceThrottledPartitions, rebalanceThrottledByErrorPartitions);
}
}
@@ -937,8 +937,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- // For test only
- protected ResourceMonitor getResourceMonitor(String resourceName) {
+ public ResourceMonitor getResourceMonitor(String resourceName) {
return _resourceMonitorMap.get(resourceName);
}
@@ -1121,4 +1120,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
return total;
}
+
+ @Override
+ public long getNumOfResourcesRebalanceThrottledGauge() {
+ long total = 0;
+ for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
+ total += entry.getValue().getRebalanceThrottledByErrorPartitionGauge();
+ }
+ return total;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index de2d7acbb..433818e45 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -141,4 +141,10 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
* @return number of pending state transitions in this cluster
*/
long getPendingStateTransitionGuage();
+
+ /**
+ * @return number of resources will only do downward state transition because the number of ERROR
+ * state partition is larger than configured threshold (default is 1).
+ */
+ long getNumOfResourcesRebalanceThrottledGauge();
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 51d332104..a7060f3c1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -65,6 +65,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledReplicas;
private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledReplicas;
private SimpleDynamicMetric<Long> _numPendingStateTransitions;
+ private SimpleDynamicMetric<Long> _rebalanceThrottledByErrorPartitionGauge;
// Counters
private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter;
@@ -128,6 +129,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0L);
_numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L);
_numPendingStateTransitions = new SimpleDynamicMetric("PendingStateTransitionGauge", 0L);
+ _rebalanceThrottledByErrorPartitionGauge =
+ new SimpleDynamicMetric("RebalanceThrottledByErrorPartitionGauge", 0L);
_partitionTopStateHandoffDurationGauge =
new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
@@ -371,11 +374,12 @@ public class ResourceMonitor extends DynamicMBeanProvider {
public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions,
- long numLoadRebalanceThrottledPartitions) {
+ long numLoadRebalanceThrottledPartitions, boolean rebalanceThrottledByErrorPartitions) {
_numPendingRecoveryRebalanceReplicas.updateValue(numPendingRecoveryRebalancePartitions);
_numPendingLoadRebalanceReplicas.updateValue(numPendingLoadRebalancePartitions);
_numRecoveryRebalanceThrottledReplicas.updateValue(numRecoveryRebalanceThrottledPartitions);
_numLoadRebalanceThrottledReplicas.updateValue(numLoadRebalanceThrottledPartitions);
+ _rebalanceThrottledByErrorPartitionGauge.updateValue(rebalanceThrottledByErrorPartitions? 1L : 0L);
}
/**
@@ -448,6 +452,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
return _rebalanceState.getValue();
}
+ public long getRebalanceThrottledByErrorPartitionGauge() {
+ return _rebalanceThrottledByErrorPartitionGauge.getValue();
+ }
+
public void resetMaxTopStateHandoffGauge() {
if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
_maxSinglePartitionTopStateHandoffDuration.updateValue(0L);
@@ -478,8 +486,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_totalMessageReceived,
_totalMessageReceivedCounter,
_numPendingStateTransitions,
- _rebalanceState
- );
+ _rebalanceState,
+ _rebalanceThrottledByErrorPartitionGauge);
attributeList.addAll(_dynamicCapacityMetricsMap.values());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 7f17dea77..b651e0d28 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -207,6 +208,9 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline"));
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+ monitor.active();
+ event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
// Initialize best possible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
@@ -264,6 +268,136 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+ // Validate that there are 0 resourced load balance been throttled
+ ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ Assert.assertEquals(clusterStatusMonitor.getNumOfResourcesRebalanceThrottledGauge(), 0);
+ Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_0")
+ .getRebalanceThrottledByErrorPartitionGauge(), 0);
+
+ for (String resource : resources) {
+ // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
+ // anything.
+ Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(),
+ expectedResult.getPartitionStateMap(resource).getStateMap());
+ }
+ }
+
+ @Test
+ public void testThrottleByErrorPartition() {
+ String resourcePrefix = "resource";
+ int nResource = 3;
+ int nPartition = 3;
+ int nReplica = 3;
+
+ String[] resources = new String[nResource];
+ for (int i = 0; i < nResource; i++) {
+ resources[i] = resourcePrefix + "_" + i;
+ }
+
+ preSetup(resources, nReplica, nReplica);
+ event.addAttribute(AttributeName.RESOURCES.name(),
+ getResourceMap(resources, nPartition, "OnlineOffline"));
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ getResourceMap(resources, nPartition, "OnlineOffline"));
+ ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+ monitor.active();
+ event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
+
+ // Initialize best possible state and current state
+ BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+ MessageOutput messageSelectOutput = new MessageOutput();
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ IntermediateStateOutput expectedResult = new IntermediateStateOutput();
+
+ _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(0);
+ setClusterConfig(_clusterConfig);
+
+ for (String resource : resources) {
+ IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
+ setSingleIdealState(is);
+
+ Map<String, List<String>> partitionMap = new HashMap<>();
+ for (int p = 0; p < nPartition; p++) {
+ Partition partition = new Partition(resource + "_" + p);
+ for (int r = 0; r < nReplica; r++) {
+ String instanceName = HOSTNAME_PREFIX + r;
+ partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName));
+ // A resource with 2 replicas in error state and one need recovery in offline->online. error state
+ // throttle won't block recovery rebalance
+ if (resource.endsWith("0")) {
+ if (p <= 1) {
+ currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR");
+ bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR");
+ expectedResult.setState(resource, partition, instanceName, "ERROR");
+ } else {
+ currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+ bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+ expectedResult.setState(resource, partition, instanceName, "OFFLINE");
+ if (r == 0) {
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "ONLINE", instanceName));
+ expectedResult.setState(resource, partition, instanceName, "ONLINE");
+ }
+ }
+ } else if (resource.endsWith("1")) {
+ // A resource with 1 replicas in error state and one need load balance in offline->online. error state
+ // throttle will block load rebalance
+ if (p <= 0) {
+ currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR");
+ bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR");
+ expectedResult.setState(resource, partition, instanceName, "ERROR");
+ } else {
+ if (r == 0) {
+ currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+ bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+ expectedResult.setState(resource, partition, instanceName, "ONLINE");
+ } else {
+ // even though there is ST msg, it should be throttled
+ currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+ bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "ONLINE", instanceName));
+ expectedResult.setState(resource, partition, instanceName, "OFFLINE");
+ }
+ }
+ } else {
+ // A resource need regular load balance
+ currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+ currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
+ bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+ // should be recovered:
+ expectedResult.setState(resource, partition, instanceName, "ONLINE");
+ }
+ }
+ }
+ bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
+ }
+
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(),
+ new ResourceControllerDataProvider());
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, new IntermediateStateCalcStage());
+
+ IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+ // Validate that there are 2 resourced load balance been throttled
+ ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ Assert.assertEquals(clusterStatusMonitor.getNumOfResourcesRebalanceThrottledGauge(), 2);
+ Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_0")
+ .getRebalanceThrottledByErrorPartitionGauge(), 1);
+ Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_1")
+ .getRebalanceThrottledByErrorPartitionGauge(), 1);
+ Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_2")
+ .getRebalanceThrottledByErrorPartitionGauge(), 0);
+
for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
// anything.