You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/09 23:43:18 UTC

helix git commit: [HELIX-716] [HELIX] Make downward load balance also be subject to StateTransitionThrottleConfig

Repository: helix
Updated Branches:
  refs/heads/master 4c3ad2aec -> dd3be71c9


[HELIX-716] [HELIX] Make downward load balance also be subject to StateTransitionThrottleConfig

In the previous implementation of allowing downward transitions, downward transitions were not subject to any throttling constraints. In this change, downward load balance transitions are made subject to the throttling constraints.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/dd3be71c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/dd3be71c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/dd3be71c

Branch: refs/heads/master
Commit: dd3be71c9423eea8283bda6819beb76edecf1fb2
Parents: 4c3ad2a
Author: Hunter Lee <na...@gmail.com>
Authored: Mon Jul 9 13:14:24 2018 -0700
Committer: Hunter Lee <na...@gmail.com>
Committed: Mon Jul 9 16:42:14 2018 -0700

----------------------------------------------------------------------
 .../stages/IntermediateStateCalcStage.java      | 65 +++++++++++---------
 .../TestPartitionMovementThrottle.java          | 18 +++---
 2 files changed, 44 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/dd3be71c/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index e70e420..0f11ecd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -45,6 +45,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+
     BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
@@ -53,8 +54,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null
         || cache == null) {
       throw new StageException(String.format("Missing attributes in event: %s. "
-              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)", event,
-          currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
+              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)",
+          event, currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
     }
 
     IntermediateStateOutput intermediateStateOutput =
@@ -320,7 +321,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     if (clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance() != -1) {
       // ErrorOrRecovery is set
       threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
-      partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold is set
+      partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold is
+      // set
     } else {
       if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
         // 0 is the default value so the old threshold has been set
@@ -328,30 +330,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       }
     }
 
-    // Perform load balance only if the number of partitions in recovery and in error is less than
-    // the threshold
-    if (partitionCount < threshold) {
-      loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput,
-          bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap,
-          partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName));
-    } else {
-      // Only allow dropping of replicas to happen (dropping does NOT need to be throttled) and skip
-      // load balance for this cycle
-      for (Partition partition : partitionsNeedLoadBalance) {
-        Map<String, String> currentStateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-        Map<String, String> bestPossibleMap =
-            bestPossiblePartitionStateMap.getPartitionMap(partition);
-        // Skip load balance by passing current state to intermediate state
-        intermediatePartitionStateMap.setState(partition, currentStateMap);
+    // Perform regular load balance only if the number of partitions in recovery and in error is
+    // less than the threshold. Otherwise, only allow downward-transition load balance
+    boolean onlyDownwardLoadBalance = partitionCount >= threshold;
 
-        // Check if this partition only has downward state transitions; if so, allow state
-        // transitions by setting it at bestPossibleState
-        if (isLoadBalanceDownwardForAllReplicas(currentStateMap, bestPossibleMap, stateModelDef)) {
-          intermediatePartitionStateMap.setState(partition, bestPossibleMap);
-        }
-      }
-    }
+    loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput,
+        bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap,
+        partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName),
+        onlyDownwardLoadBalance, stateModelDef);
 
     if (clusterStatusMonitor != null) {
       clusterStatusMonitor.updateRebalancerStats(resourceName, partitionsNeedRecovery.size(),
@@ -507,13 +493,16 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
    * @param intermediatePartitionStateMap
    * @param partitionsNeedLoadbalance
    * @param currentStateMap
-   * @return a set of partitions that need to be load-balanced but did not due to throttling
+   * @param onlyDownwardLoadBalance true when only allowing downward transitions
+   * @param stateModelDef for determining whether a partition's transitions are strictly downward
+   * @return
    */
   private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput,
       PartitionStateMap bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance,
-      Map<Partition, Map<String, String>> currentStateMap) {
+      Map<Partition, Map<String, String>> currentStateMap, boolean onlyDownwardLoadBalance,
+      StateModelDefinition stateModelDef) {
     String resourceName = resource.getResourceName();
     Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
 
@@ -532,7 +521,23 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     });
     Collections.sort(partitionsNeedLoadRebalancePrioritized, new PartitionPriorityComparator(
         bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", false));
+
     for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
+      // If this is a downward load balance, check if the partition's transition is strictly
+      // downward
+      if (onlyDownwardLoadBalance) {
+        Map<String, String> currentStateMapForPartition =
+            currentStateOutput.getCurrentStateMap(resourceName, partition);
+        Map<String, String> bestPossibleMapForPartition =
+            bestPossiblePartitionStateMap.getPartitionMap(partition);
+        if (!isLoadBalanceDownwardForAllReplicas(currentStateMapForPartition,
+            bestPossibleMapForPartition, stateModelDef)) {
+          // For downward load balance, if a partition's transitions are not strictly downward,
+          // set currentState to intermediateState
+          intermediatePartitionStateMap.setState(partition, currentStateMapForPartition);
+          continue;
+        }
+      }
       throttleStateTransitionsForPartition(throttleController, resourceName, partition,
           currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled,
           intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE);
@@ -586,7 +591,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
             hasReachedThrottlingLimit = true;
             if (logger.isDebugEnabled()) {
               logger.debug(
-                  "Throttled because of instance: {} for partition: {} in resource: {}" + instance,
+                  "Throttled because of instance: {} for partition: {} in resource: {}", instance,
                   partition.getPartitionName(), resourceName);
             }
             break;
@@ -834,4 +839,4 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       return matchedState;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/dd3be71c/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
index d37f9cf..49f110e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestPartitionMovementThrottle.java
@@ -109,17 +109,17 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
             StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100);
 
 
-        StateTransitionThrottleConfig resourceRecoveryThrottle = new StateTransitionThrottleConfig(
-            StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
-            StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 3);
+    StateTransitionThrottleConfig resourceRecoveryThrottle = new StateTransitionThrottleConfig(
+        StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+        StateTransitionThrottleConfig.ThrottleScope.RESOURCE, 3);
 
-        StateTransitionThrottleConfig clusterRecoveryThrottle = new StateTransitionThrottleConfig(
-            StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
-            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100);
+    StateTransitionThrottleConfig clusterRecoveryThrottle = new StateTransitionThrottleConfig(
+        StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 100);
 
     clusterConfig.setStateTransitionThrottleConfigs(Arrays
         .asList(resourceLoadThrottle, instanceLoadThrottle, clusterLoadThrottle,
-    resourceRecoveryThrottle, clusterRecoveryThrottle));
+            resourceRecoveryThrottle, clusterRecoveryThrottle));
 
 
     clusterConfig.setPersistIntermediateAssignment(true);
@@ -362,7 +362,6 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
         "Throttle condition does not meet for " + throttledItemName);
   }
 
-
   private int size(List<PartitionTransitionTime> timeList) {
     Set<String> partitions = new HashSet<String>();
     for (PartitionTransitionTime p : timeList) {
@@ -382,7 +381,8 @@ public class TestPartitionMovementThrottle extends ZkStandAloneCMTestBase {
       this.end = end;
     }
 
-    @Override public String toString() {
+    @Override
+    public String toString() {
       return "[" +
           "partition='" + partition + '\'' +
           ", start=" + start +