You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/04/23 00:07:30 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #1703: [Replica Level Throttle] Add per replica rebalance type compute logic

jiajunwang commented on a change in pull request #1703:
URL: https://github.com/apache/helix/pull/1703#discussion_r618820558



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -795,6 +798,55 @@ private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
     }
   }
 
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> requiredStates, Message message,

Review comment:
       Let's add some descriptions about the parameters. It's not easy to understand the intention by the param names only.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -795,6 +798,55 @@ private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
     }
   }
 
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> requiredStates, Message message,
+      Map<String, String> derivedCurrentStates) {
+    Map<String, Integer> requiredStatesSnapshot = new HashMap<>(requiredStates);
+    // Looping existing current states to see whether current states fulfilled all the required states.
+    for (String state : derivedCurrentStates.values()) {
+      if (requiredStatesSnapshot.containsKey(state)) {
+        requiredStatesSnapshot.put(state, requiredStatesSnapshot.get(state) - 1);
+        if (requiredStatesSnapshot.get(state) == 0) {
+          requiredStatesSnapshot.remove(state);
+        }
+      }
+    }

Review comment:
       This calculation would be done multiple times if a partition has more than one message. Should we just pre-calculate the requiredStatesSnapshot (which is basically the gap between the derived current state and required states) and then use the snapshot? Obviously, we need to update the snapshot once a message is confirmed. But that calculation would be much faster than loop these 2 types of maps every time.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -795,6 +798,55 @@ private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
     }
   }
 
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> requiredStates, Message message,
+      Map<String, String> derivedCurrentStates) {
+    Map<String, Integer> requiredStatesSnapshot = new HashMap<>(requiredStates);
+    // Looping existing current states to see whether current states fulfilled all the required states.
+    for (String state : derivedCurrentStates.values()) {
+      if (requiredStatesSnapshot.containsKey(state)) {
+        requiredStatesSnapshot.put(state, requiredStatesSnapshot.get(state) - 1);
+        if (requiredStatesSnapshot.get(state) == 0) {
+          requiredStatesSnapshot.remove(state);
+        }
+      }
+    }
+
+    // If the message is trying to bring the required state remaining in the map, it is recovery rebalance.
+    // Otherwise it is load rebalance.
+    return requiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE
+        : RebalanceType.LOAD_BALANCE;
+  }
+
+  private Map<String, Integer> getRequiredStates(String resourceName,
+      ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList) {
+
+    // Prepare required inputs: 1) Priority State List 2) required number of replica

Review comment:
       nit, this is a duplicate of the existing code. Shall we extract a private method here? Or if you plan to remove the other code in the following PR, please add a TODO so we don't forget.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -874,7 +926,31 @@ public int compare(ResourcePriority priority1, ResourcePriority priority2) {
     }
   }
 
-  // Compare partitions according following standard:
+  private class MessagePriorityComparator implements Comparator<Message> {
+    private Map<String, Integer> _preferenceInstanceMap;
+    private Map<String, Integer> _statePriorityMap;
+
+    MessagePriorityComparator(List<String> preferenceList, Map<String, Integer> statePriorityMap) {
+      // Get instance -> priority map.
+      _preferenceInstanceMap = IntStream.range(0, preferenceList.size())
+          .boxed()
+          .collect(Collectors.toMap(preferenceList::get, index -> index));
+      _statePriorityMap = statePriorityMap;
+    }
+
+    @Override
+    public int compare(Message m1, Message m2) {
+      //Compare rules:
+      //     1. Higher target state has higher priority.
+      //     2. If target state is same, range it as preference list order.
+      if (m1.getToState().equals(m2.getToState())) {
+        return _preferenceInstanceMap.get(m1.getTgtName()).compareTo(_preferenceInstanceMap.get(m2.getTgtName()));

Review comment:
       For safety and future extension, maybe it is a good idea to sort the messages by name as the last resort.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -874,7 +926,31 @@ public int compare(ResourcePriority priority1, ResourcePriority priority2) {
     }
   }
 
-  // Compare partitions according following standard:
+  private class MessagePriorityComparator implements Comparator<Message> {
+    private Map<String, Integer> _preferenceInstanceMap;
+    private Map<String, Integer> _statePriorityMap;
+
+    MessagePriorityComparator(List<String> preferenceList, Map<String, Integer> statePriorityMap) {
+      // Get instance -> priority map.
+      _preferenceInstanceMap = IntStream.range(0, preferenceList.size())
+          .boxed()
+          .collect(Collectors.toMap(preferenceList::get, index -> index));
+      _statePriorityMap = statePriorityMap;
+    }
+
+    @Override
+    public int compare(Message m1, Message m2) {
+      //Compare rules:
+      //     1. Higher target state has higher priority.
+      //     2. If target state is same, range it as preference list order.
+      if (m1.getToState().equals(m2.getToState())) {
+        return _preferenceInstanceMap.get(m1.getTgtName()).compareTo(_preferenceInstanceMap.get(m2.getTgtName()));

Review comment:
       A message might target an instance that does not exist in the preference list.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -795,6 +798,55 @@ private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
     }
   }
 
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> requiredStates, Message message,
+      Map<String, String> derivedCurrentStates) {
+    Map<String, Integer> requiredStatesSnapshot = new HashMap<>(requiredStates);
+    // Looping existing current states to see whether current states fulfilled all the required states.
+    for (String state : derivedCurrentStates.values()) {
+      if (requiredStatesSnapshot.containsKey(state)) {
+        requiredStatesSnapshot.put(state, requiredStatesSnapshot.get(state) - 1);
+        if (requiredStatesSnapshot.get(state) == 0) {
+          requiredStatesSnapshot.remove(state);
+        }
+      }
+    }
+
+    // If the message is trying to bring the required state remaining in the map, it is recovery rebalance.
+    // Otherwise it is load rebalance.
+    return requiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE
+        : RebalanceType.LOAD_BALANCE;
+  }
+
+  private Map<String, Integer> getRequiredStates(String resourceName,
+      ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList) {
+
+    // Prepare required inputs: 1) Priority State List 2) required number of replica
+    IdealState idealState = resourceControllerDataProvider.getIdealState(resourceName);
+    StateModelDefinition stateModelDefinition =
+        resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
+    int requiredNumReplica = idealState.getMinActiveReplicas() == -1
+        ? idealState.getReplicaCount(preferenceList.size())
+        : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(resourceControllerDataProvider.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    LinkedHashMap<String, Integer> expectedStateCountMap =

Review comment:
       Given MinActiveReplicas != -1, the getStateCountMap() already returns a state count map that fit into the total min replica count. Do we really need the following calculation?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
##########
@@ -795,6 +798,55 @@ private RebalanceType getRebalanceType(ResourceControllerDataProvider cache,
     }
   }
 
+  private RebalanceType getRebalanceTypePerMessage(Map<String, Integer> requiredStates, Message message,
+      Map<String, String> derivedCurrentStates) {
+    Map<String, Integer> requiredStatesSnapshot = new HashMap<>(requiredStates);
+    // Looping existing current states to see whether current states fulfilled all the required states.
+    for (String state : derivedCurrentStates.values()) {
+      if (requiredStatesSnapshot.containsKey(state)) {
+        requiredStatesSnapshot.put(state, requiredStatesSnapshot.get(state) - 1);
+        if (requiredStatesSnapshot.get(state) == 0) {
+          requiredStatesSnapshot.remove(state);
+        }
+      }
+    }
+
+    // If the message is trying to bring the required state remaining in the map, it is recovery rebalance.
+    // Otherwise it is load rebalance.
+    return requiredStatesSnapshot.containsKey(message.getToState()) ? RebalanceType.RECOVERY_BALANCE
+        : RebalanceType.LOAD_BALANCE;
+  }
+
+  private Map<String, Integer> getRequiredStates(String resourceName,
+      ResourceControllerDataProvider resourceControllerDataProvider, List<String> preferenceList) {
+
+    // Prepare required inputs: 1) Priority State List 2) required number of replica
+    IdealState idealState = resourceControllerDataProvider.getIdealState(resourceName);
+    StateModelDefinition stateModelDefinition =
+        resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
+    int requiredNumReplica = idealState.getMinActiveReplicas() == -1
+        ? idealState.getReplicaCount(preferenceList.size())
+        : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(resourceControllerDataProvider.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDefinition.getStateCountMap(activeList.size(), requiredNumReplica); // StateModelDefinition's counts
+
+    Map<String, Integer> requiredStates = new HashMap<>();
+    for (String state : stateModelDefinition.getStatesPriorityList()) {
+      if (requiredNumReplica <= 0) {
+        break;
+      }
+
+      requiredStates.put(state, Math.min(requiredNumReplica, expectedStateCountMap.get(state)));
+      requiredNumReplica -= requiredStates.get(state);
+    }
+    return requiredStates;

Review comment:
       nit, Could you please add some description/example about the expected result here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org