You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:44:42 UTC
[helix] 01/07: [Replica Level Throttle] Add per replica rebalance
type compute logic (#1703)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit c04f355dd7de3d0c42fcb0e40f70b4de083afdef
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Fri Apr 23 14:58:48 2021 -0700
[Replica Level Throttle] Add per replica rebalance type compute logic (#1703)
* Add per replica rebalance type compute logic
Three functions added: 1) rebalance type computation required state.
2) rebalance type per message
3) message sorting rules and comparators to determine which message to apply first.
---
.../stages/IntermediateStateCalcStage.java | 82 +++++++++++++++++++++-
1 file changed, 81 insertions(+), 1 deletion(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c0defa9..a840d5e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -29,6 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
@@ -43,6 +45,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MaintenanceSignal;
+import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
@@ -796,6 +799,54 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
/**
+ * Determine the message rebalance type with message and current states.
+ * @param desiredStates Ideally how may states we needed for guarantee the health of replica
+ * @param message The message to be determined what is the rebalance type
+ * @param derivedCurrentStates Derived from current states with previous messages not be throttled.
+ * @return Rebalance type. Recovery or load.
+ */
+ private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> desiredStates, Message message,
+ Map<String, String> derivedCurrentStates) {
+ Map<String, Integer> desiredStatesSnapshot = new HashMap<>(desiredStates);
+ // Looping existing current states to see whether current states fulfilled all the required states.
+ for (String state : derivedCurrentStates.values()) {
+ if (desiredStatesSnapshot.containsKey(state)) {
+ if (desiredStatesSnapshot.get(state) == 1) {
+ desiredStatesSnapshot.remove(state);
+ } else {
+ desiredStatesSnapshot.put(state, desiredStatesSnapshot.get(state) - 1);
+ }
+ }
+ }
+
+ // If the message contains any "required" state changes, then it is considered recovery rebalance.
+ // Otherwise, it is load balance.
+ return desiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE
+ : RebalanceType.LOAD_BALANCE;
+ }
+
+ private Map<String, Integer> getRequiredStates(String resourceName,
+ ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList) {
+
+ // Prepare required inputs: 1) Priority State List 2) required number of replica
+ IdealState idealState = resourceControllerDataProvider.getIdealState(resourceName);
+ StateModelDefinition stateModelDefinition =
+ resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
+ int requiredNumReplica = idealState.getMinActiveReplicas() == -1
+ ? idealState.getReplicaCount(preferenceList.size())
+ : idealState.getMinActiveReplicas();
+
+ // Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition
+ // preference list
+ LinkedHashMap<String, Integer> expectedStateCountMap = stateModelDefinition.getStateCountMap(
+ (int) preferenceList.stream()
+ .filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
+ .count(), requiredNumReplica); // StateModelDefinition's counts
+
+ return expectedStateCountMap;
+ }
+
+ /**
* Log rebalancer metadata for debugging purposes.
* @param resource
* @param allPartitions
@@ -874,7 +925,36 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
}
- // Compare partitions according following standard:
+ private class MessagePriorityComparator implements Comparator<Message> {
+ private Map<String, Integer> _preferenceInstanceMap;
+ private Map<String, Integer> _statePriorityMap;
+
+ MessagePriorityComparator(List<String> preferenceList, Map<String, Integer> statePriorityMap) {
+ // Get instance -> priority map.
+ _preferenceInstanceMap = IntStream.range(0, preferenceList.size())
+ .boxed()
+ .collect(Collectors.toMap(preferenceList::get, index -> index));
+ _statePriorityMap = statePriorityMap;
+ }
+
+ @Override
+ public int compare(Message m1, Message m2) {
+ // Compare rules:
+ // 1. Higher target state has higher priority.
+ // 2. If target state is same, range it as preference list order.
+ // 3. Sort by the name of targeted instances just for deterministic ordering.
+ if (m1.getToState().equals(m2.getToState()) && _preferenceInstanceMap.containsKey(m1.getTgtName())
+ && _preferenceInstanceMap.containsKey(m2.getTgtName())) {
+ return _preferenceInstanceMap.get(m1.getTgtName()).compareTo(_preferenceInstanceMap.get(m2.getTgtName()));
+ }
+ if (!m1.getToState().equals(m2.getToState())) {
+ return _statePriorityMap.get(m1.getToState()).compareTo(_statePriorityMap.get(m2.getToState()));
+ }
+ return m1.getTgtName().compareTo(m2.getTgtName());
+ }
+ }
+
+ // Compare partitions according following standard:
// 1) Partition without top state always is the highest priority.
// 2) For partition with top-state, the more number of active replica it has, the less priority.
private class PartitionPriorityComparator implements Comparator<Partition> {