You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/09/14 02:43:30 UTC

[helix] branch master updated: Fix error partition blocks load rebalance (#1867)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 99adc5f  Fix error partition blocks load rebalance (#1867)
99adc5f is described below

commit 99adc5fbfd6f887229fb70e60b31c03f189b7704
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Mon Sep 13 19:43:24 2021 -0700

    Fix error partition blocks load rebalance (#1867)
    
    There are three things fixed:
    1. State priority is higher priority with smaller number.
    2. When only downward is allowed, any non downward STs must be removed from message and throttled.
    3. Even for downward STs should be respect to the throttling as backward compatible behavior.
    4. Fix test for TestErrorReplicaPersist
---
 .../stages/IntermediateStateCalcStage.java         |  11 +-
 .../stages/TestReplicaLevelThrottling.java         |  14 ++-
 .../helix/integration/TestErrorReplicaPersist.java |   7 ++
 .../TestReplicaLevelThrottling.SingleTopState.json | 115 +++++++++++++++++++++
 4 files changed, 141 insertions(+), 6 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index 1d2ed02..558a61f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -437,7 +437,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     // If the state is not found in statePriorityMap, consider it not strictly downward by
     // default because we can't determine whether it is downward
     if (statePriorityMap.containsKey(message.getFromState()) && statePriorityMap.containsKey(message.getToState())
-        && statePriorityMap.get(message.getFromState()) > statePriorityMap.get(message.getToState())) {
+        && statePriorityMap.get(message.getFromState()) < statePriorityMap.get(message.getToState())) {
       return true;
     }
     return false;
@@ -520,8 +520,13 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
        ResourceControllerDataProvider cache,
       boolean onlyDownwardLoadBalance, StateModelDefinition stateModelDefinition, Set<String> messagesThrottled,
       Map<Partition, List<Message>> resourceMessageMap) {
-    if (onlyDownwardLoadBalance && isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) {
-      // Remove the message already allowed for downward state transitions.
+    // TODO: refactor the logic into throttling to let throttling logic to handle only downward including recovery rebalance
+    // If only downward allowed: 1) any non-downward ST messages will be throttled and removed.
+    //                           2) any downward ST messages will respect the throttling.
+    // If not only downward allowed, all ST messages should respect the throttling.
+    if (onlyDownwardLoadBalance && !isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) {
+      resourceMessageMap.get(partition).remove(messageToThrottle);
+      messagesThrottled.add(messageToThrottle.getId());
       return;
     }
     throttleStateTransitionsForReplica(throttleController, resource.getResourceName(), partition, messageToThrottle,
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
index 0c1a705..af47542 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -54,6 +54,7 @@ public class TestReplicaLevelThrottling extends BaseStageTest {
   static final String CLUSTER_NAME = "TestCluster";
   static final String RESOURCE_NAME = "TestResource";
   static final String NOT_SET = "-1";
+  static final String DEFAULT_ERROR_THRESHOLD = String.valueOf(Integer.MAX_VALUE);
 
   @Test(dataProvider = "replicaLevelThrottlingInput")
   public void testPerReplicaThrottling(ClusterEvent event, Map<String, Map<String, String>> expectedOutput,
@@ -128,11 +129,17 @@ public class TestReplicaLevelThrottling extends BaseStageTest {
     instanceThrottleRecovery,
     currentStates,
     pendingMessages,
-    expectedOutput
+    expectedOutput,
+    errorThreshold
   }
 
   enum CacheKeys {
-    clusterConfig, stateModelName, stateModelDef, minActiveReplica, numReplica, preferenceList
+    clusterConfig,
+    stateModelName,
+    stateModelDef,
+    minActiveReplica,
+    numReplica,
+    preferenceList
   }
 
   public List<Object[]> loadTestInputs(String fileName) {
@@ -206,8 +213,9 @@ public class TestReplicaLevelThrottling extends BaseStageTest {
         getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
             StateTransitionThrottleConfig.ThrottleScope.INSTANCE, Entry.instanceThrottleRecovery.name(),
             throttleConfigs, inMap);
-
         clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+        clusterConfig.setErrorPartitionThresholdForLoadBalance(Integer.parseInt(
+            (String) inMap.getOrDefault(Entry.errorThreshold.name(), DEFAULT_ERROR_THRESHOLD)));
 
         Map<String, Object> cacheMap = new HashMap<>();
         cacheMap.put(CacheKeys.clusterConfig.name(), clusterConfig);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
index ff47a71..5c7fb86 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestErrorReplicaPersist.java
@@ -21,6 +21,7 @@ package org.apache.helix.integration;
 
 import java.util.Date;
 
+import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.TestHelper;
@@ -29,6 +30,7 @@ import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.rebalancer.TestAutoRebalance;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.StateMachineEngine;
@@ -97,6 +99,11 @@ public class TestErrorReplicaPersist extends ZkStandAloneCMTestBase {
 
   @Test
   public void testErrorReplicaPersist() throws InterruptedException {
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setErrorPartitionThresholdForLoadBalance(100000);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    
     for (int i = 0; i < (NODE_NR + 1) / 2; i++) {
       _participants[i].syncStop();
       Thread.sleep(2000);
diff --git a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
index 9706d16..5d4cdaf 100644
--- a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
+++ b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
@@ -275,6 +275,121 @@
           "localhost_12915": "OFFLINE"
         }
       }
+    },
+    {
+      "description": "Set error threshold, only downward allowed",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "-1",
+      "instanceThrottleLoad": "10",
+      "instanceThrottleRecovery": "-1",
+      "errorThreshold" : "2",
+      "partitionNames": [
+        "partition_0",
+        "partition_1",
+        "partition_2",
+        "partition_3"
+      ],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12916": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_2": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_3": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "STANDBY",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY",
+          "localhost_12916": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_2": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_3": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_2": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_3": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "pendingMessages": {
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY",
+          "localhost_12916": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_2": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_3": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "ERROR",
+          "localhost_12915": "STANDBY"
+        }
+      }
     }
   ]
 }
\ No newline at end of file