You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:44:42 UTC

[helix] 01/07: [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c04f355dd7de3d0c42fcb0e40f70b4de083afdef
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Fri Apr 23 14:58:48 2021 -0700

    [Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
    
    * Add per replica rebalance type compute logic
    
    Three functions added: 1) rebalance type computation required state.
                           2) rebalance type per message
                           3) message sorting rules and comparators to determine which message to apply first.
---
 .../stages/IntermediateStateCalcStage.java         | 82 +++++++++++++++++++++-
 1 file changed, 81 insertions(+), 1 deletion(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c0defa9..a840d5e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -43,6 +45,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
@@ -796,6 +799,54 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
   }
 
   /**
+   * Determine the message rebalance type with message and current states.
+   * @param desiredStates         Ideally how may states we needed for guarantee the health of replica
+   * @param message               The message to be determined what is the rebalance type
+   * @param derivedCurrentStates  Derived from current states with previous messages not be throttled.
+   * @return                      Rebalance type. Recovery or load.
+   */
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> desiredStates, Message message,
+      Map<String, String> derivedCurrentStates) {
+    Map<String, Integer> desiredStatesSnapshot = new HashMap<>(desiredStates);
+    // Looping existing current states to see whether current states fulfilled all the required states.
+    for (String state : derivedCurrentStates.values()) {
+      if (desiredStatesSnapshot.containsKey(state)) {
+        if (desiredStatesSnapshot.get(state) == 1) {
+          desiredStatesSnapshot.remove(state);
+        } else {
+          desiredStatesSnapshot.put(state, desiredStatesSnapshot.get(state) - 1);
+        }
+      }
+    }
+
+    // If the message contains any "required" state changes, then it is considered recovery rebalance.
+    // Otherwise, it is load balance.
+    return desiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE
+        : RebalanceType.LOAD_BALANCE;
+  }
+
+  private Map<String, Integer> getRequiredStates(String resourceName,
+      ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList) {
+
+    // Prepare required inputs: 1) Priority State List 2) required number of replica
+    IdealState idealState = resourceControllerDataProvider.getIdealState(resourceName);
+    StateModelDefinition stateModelDefinition =
+        resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
+    int requiredNumReplica = idealState.getMinActiveReplicas() == -1
+        ? idealState.getReplicaCount(preferenceList.size())
+        : idealState.getMinActiveReplicas();
+
+    // Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition
+    // preference list
+    LinkedHashMap<String, Integer> expectedStateCountMap = stateModelDefinition.getStateCountMap(
+        (int) preferenceList.stream()
+            .filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+            .count(), requiredNumReplica); // StateModelDefinition's counts
+
+    return expectedStateCountMap;
+  }
+
+  /**
    * Log rebalancer metadata for debugging purposes.
    * @param resource
    * @param allPartitions
@@ -874,7 +925,36 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
     }
   }
 
-  // Compare partitions according following standard:
+  private class MessagePriorityComparator implements Comparator<Message> {
+    private Map<String, Integer> _preferenceInstanceMap;
+    private Map<String, Integer> _statePriorityMap;
+
+    MessagePriorityComparator(List<String> preferenceList, Map<String, Integer> statePriorityMap) {
+      // Get instance -> priority map.
+      _preferenceInstanceMap = IntStream.range(0, preferenceList.size())
+          .boxed()
+          .collect(Collectors.toMap(preferenceList::get, index -> index));
+      _statePriorityMap = statePriorityMap;
+    }
+
+    @Override
+    public int compare(Message m1, Message m2) {
+      //  Compare rules:
+      //     1. Higher target state has higher priority.
+      //     2. If target state is same, range it as preference list order.
+      //     3. Sort by the name of targeted instances just for deterministic ordering.
+      if (m1.getToState().equals(m2.getToState()) && _preferenceInstanceMap.containsKey(m1.getTgtName())
+          && _preferenceInstanceMap.containsKey(m2.getTgtName())) {
+        return _preferenceInstanceMap.get(m1.getTgtName()).compareTo(_preferenceInstanceMap.get(m2.getTgtName()));
+      }
+      if (!m1.getToState().equals(m2.getToState())) {
+        return _statePriorityMap.get(m1.getToState()).compareTo(_statePriorityMap.get(m2.getToState()));
+      }
+      return m1.getTgtName().compareTo(m2.getTgtName());
+    }
+  }
+
+    // Compare partitions according following standard:
   // 1) Partition without top state always is the highest priority.
   // 2) For partition with top-state, the more number of active replica it has, the less priority.
   private class PartitionPriorityComparator implements Comparator<Partition> {