You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/01/20 18:38:06 UTC

[helix] branch master updated: Add metrics for rebalance throttled because of error partition

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new aa5d1477d Add metrics for rebalance throttled because of error partition
aa5d1477d is described below

commit aa5d1477def24ebe9c8a3c4a8ea2f502d64551f5
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Jan 20 10:38:01 2023 -0800

    Add metrics for rebalance throttled because of error partition
    
    This change adds metrics for rebalance throttled because of error partition. It will report a gauge value on resource and cluster granularity in Intermediate stage.
---
 .../stages/IntermediateStateCalcStage.java         |   5 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  16 ++-
 .../mbeans/ClusterStatusMonitorMBean.java          |   6 +
 .../helix/monitoring/mbeans/ResourceMonitor.java   |  14 ++-
 .../stages/TestIntermediateStateCalcStage.java     | 134 +++++++++++++++++++++
 5 files changed, 165 insertions(+), 10 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index a7622b146..835b19be5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -28,7 +28,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -52,7 +51,6 @@ import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.monitoring.mbeans.ResourceMonitor;
-import org.apache.helix.participant.statemachine.StateModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -421,7 +419,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     if (clusterStatusMonitor != null) {
       clusterStatusMonitor
           .updateRebalancerStats(resourceName, messagesForRecovery.size(), messagesForLoad.size(),
-              messagesThrottledForRecovery.size(), messagesThrottledForLoad.size());
+              messagesThrottledForRecovery.size(), messagesThrottledForLoad.size(),
+              onlyDownwardLoadBalance);
     }
 
     if (logger.isDebugEnabled()) {
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 0207753b8..bf0315d3a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -585,13 +585,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
   public void updateRebalancerStats(String resourceName, long numPendingRecoveryRebalancePartitions,
       long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions,
-      long numLoadRebalanceThrottledPartitions) {
+      long numLoadRebalanceThrottledPartitions, boolean rebalanceThrottledByErrorPartitions) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
       resourceMonitor.updateRebalancerStats(numPendingRecoveryRebalancePartitions,
           numPendingLoadRebalancePartitions, numRecoveryRebalanceThrottledPartitions,
-          numLoadRebalanceThrottledPartitions);
+          numLoadRebalanceThrottledPartitions, rebalanceThrottledByErrorPartitions);
     }
   }
 
@@ -937,8 +937,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  // For test only
-  protected ResourceMonitor getResourceMonitor(String resourceName) {
+  public ResourceMonitor getResourceMonitor(String resourceName) {
     return _resourceMonitorMap.get(resourceName);
   }
 
@@ -1121,4 +1120,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
     return total;
   }
+
+  @Override
+  public long getNumOfResourcesRebalanceThrottledGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
+      total += entry.getValue().getRebalanceThrottledByErrorPartitionGauge();
+    }
+    return total;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index de2d7acbb..433818e45 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -141,4 +141,10 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
    * @return number of pending state transitions in this cluster
    */
   long getPendingStateTransitionGuage();
+
+  /**
+   * @return number of resources will only do downward state transition because the number of ERROR
+   * state partition is larger than configured threshold (default is 1).
+   */
+  long getNumOfResourcesRebalanceThrottledGauge();
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index 51d332104..a7060f3c1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -65,6 +65,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledReplicas;
   private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledReplicas;
   private SimpleDynamicMetric<Long> _numPendingStateTransitions;
+  private SimpleDynamicMetric<Long> _rebalanceThrottledByErrorPartitionGauge;
 
   // Counters
   private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter;
@@ -128,6 +129,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0L);
     _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L);
     _numPendingStateTransitions = new SimpleDynamicMetric("PendingStateTransitionGauge", 0L);
+    _rebalanceThrottledByErrorPartitionGauge =
+        new SimpleDynamicMetric("RebalanceThrottledByErrorPartitionGauge", 0L);
 
     _partitionTopStateHandoffDurationGauge =
         new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
@@ -371,11 +374,12 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
       long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions,
-      long numLoadRebalanceThrottledPartitions) {
+      long numLoadRebalanceThrottledPartitions, boolean rebalanceThrottledByErrorPartitions) {
     _numPendingRecoveryRebalanceReplicas.updateValue(numPendingRecoveryRebalancePartitions);
     _numPendingLoadRebalanceReplicas.updateValue(numPendingLoadRebalancePartitions);
     _numRecoveryRebalanceThrottledReplicas.updateValue(numRecoveryRebalanceThrottledPartitions);
     _numLoadRebalanceThrottledReplicas.updateValue(numLoadRebalanceThrottledPartitions);
+    _rebalanceThrottledByErrorPartitionGauge.updateValue(rebalanceThrottledByErrorPartitions? 1L : 0L);
   }
 
   /**
@@ -448,6 +452,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _rebalanceState.getValue();
   }
 
+  public long getRebalanceThrottledByErrorPartitionGauge() {
+    return _rebalanceThrottledByErrorPartitionGauge.getValue();
+  }
+
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
       _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);
@@ -478,8 +486,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
         _totalMessageReceived,
         _totalMessageReceivedCounter,
         _numPendingStateTransitions,
-        _rebalanceState
-    );
+        _rebalanceState,
+        _rebalanceThrottledByErrorPartitionGauge);
 
     attributeList.addAll(_dynamicCapacityMetricsMap.values());
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 7f17dea77..b651e0d28 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -207,6 +208,9 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline"));
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
         getResourceMap(resources, nPartition, "OnlineOffline"));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
 
     // Initialize best possible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
@@ -264,6 +268,136 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
 
     IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
 
+
+    // Validate that there are 0 resourced load balance been throttled
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    Assert.assertEquals(clusterStatusMonitor.getNumOfResourcesRebalanceThrottledGauge(), 0);
+    Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_0")
+        .getRebalanceThrottledByErrorPartitionGauge(), 0);
+
+    for (String resource : resources) {
+      // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
+      // anything.
+      Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(),
+          expectedResult.getPartitionStateMap(resource).getStateMap());
+    }
+  }
+
+  @Test
+  public void testThrottleByErrorPartition() {
+    String resourcePrefix = "resource";
+    int nResource = 3;
+    int nPartition = 3;
+    int nReplica = 3;
+
+    String[] resources = new String[nResource];
+    for (int i = 0; i < nResource; i++) {
+      resources[i] = resourcePrefix + "_" + i;
+    }
+
+    preSetup(resources, nReplica, nReplica);
+    event.addAttribute(AttributeName.RESOURCES.name(),
+        getResourceMap(resources, nPartition, "OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        getResourceMap(resources, nPartition, "OnlineOffline"));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
+
+    // Initialize best possible state and current state
+    BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+    MessageOutput messageSelectOutput = new MessageOutput();
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    IntermediateStateOutput expectedResult = new IntermediateStateOutput();
+
+    _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(0);
+    setClusterConfig(_clusterConfig);
+
+    for (String resource : resources) {
+      IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
+      setSingleIdealState(is);
+
+      Map<String, List<String>> partitionMap = new HashMap<>();
+      for (int p = 0; p < nPartition; p++) {
+        Partition partition = new Partition(resource + "_" + p);
+        for (int r = 0; r < nReplica; r++) {
+          String instanceName = HOSTNAME_PREFIX + r;
+          partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName));
+          // A resource with 2 replicas in error state and one need recovery in offline->online. error state
+          // throttle won't block recovery rebalance
+          if (resource.endsWith("0")) {
+            if (p <= 1) {
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR");
+              bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR");
+              expectedResult.setState(resource, partition, instanceName, "ERROR");
+            } else {
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+              bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+              expectedResult.setState(resource, partition, instanceName, "OFFLINE");
+              if (r == 0) {
+                messageSelectOutput.addMessage(resource, partition,
+                    generateMessage("OFFLINE", "ONLINE", instanceName));
+                expectedResult.setState(resource, partition, instanceName, "ONLINE");
+              }
+            }
+          } else if (resource.endsWith("1")) {
+            // A resource with 1 replicas in error state and one need load balance in offline->online. error state
+            // throttle will block load rebalance
+            if (p <= 0) {
+              currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR");
+              bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR");
+              expectedResult.setState(resource, partition, instanceName, "ERROR");
+            } else {
+              if (r == 0) {
+                currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+                bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+                expectedResult.setState(resource, partition, instanceName, "ONLINE");
+              } else {
+                // even though there is ST msg, it should be throttled
+                currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+                bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+                messageSelectOutput.addMessage(resource, partition,
+                    generateMessage("OFFLINE", "ONLINE", instanceName));
+                expectedResult.setState(resource, partition, instanceName, "OFFLINE");
+              }
+            }
+          } else {
+            // A resource need regular load balance
+            currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
+            currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
+            bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
+            messageSelectOutput.addMessage(resource, partition,
+                generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
+            // should be recovered:
+            expectedResult.setState(resource, partition, instanceName, "ONLINE");
+          }
+        }
+      }
+      bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
+    }
+
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new ResourceControllerDataProvider());
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, new IntermediateStateCalcStage());
+
+    IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    // Validate that there are 2 resourced load balance been throttled
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    Assert.assertEquals(clusterStatusMonitor.getNumOfResourcesRebalanceThrottledGauge(), 2);
+    Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_0")
+        .getRebalanceThrottledByErrorPartitionGauge(), 1);
+    Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_1")
+        .getRebalanceThrottledByErrorPartitionGauge(), 1);
+    Assert.assertEquals(clusterStatusMonitor.getResourceMonitor("resource_2")
+        .getRebalanceThrottledByErrorPartitionGauge(), 0);
+
     for (String resource : resources) {
       // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
       // anything.