You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/02/06 00:15:10 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #1628: Per Replica Throttle -- 2nd: messages classification and basic throttle application

jiajunwang commented on a change in pull request #1628:
URL: https://github.com/apache/helix/pull/1628#discussion_r570512232



##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -146,8 +153,8 @@ private MessageOutput compute(ClusterEvent event, Map<String, Resource> resource
           partitonMsgMap.put(partition, msgList);
         }
         MessageOutput resourceMsgOut =
-            throttlePerReplicaMessages(idealState, partitonMsgMap, bestPossibleStateOutput,
-                throttleController, retracedPartitionsState);
+            throttlePerReplicaMessages(idealState, currentStateOutput, partitonMsgMap,
+                bestPossibleStateOutput, dataCache, throttleController, retracedPartitionsState);

Review comment:
       Follow up on what I said above, we shall be able to send the resource-related information only here. A whole cache object is not necessary. I understand that it may not be able to execute ideally, but please have a try. Let me know if there is any blocker.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -94,11 +100,13 @@ public void process(ClusterEvent event) throws Exception {
    * of possible pending state transitions does NOT go over the set threshold).
    * @param event
    * @param resourceMap
+   * @param currentStateOutput
    * @param selectedMessage
    * @param retracedResourceStateMap out
    */
   private MessageOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
-      MessageOutput selectedMessage, ResourcesStateMap retracedResourceStateMap) {
+      CurrentStateOutput currentStateOutput, MessageOutput selectedMessage,
+      ResourcesStateMap retracedResourceStateMap) {
     MessageOutput output = new MessageOutput();
 
     ResourceControllerDataProvider dataCache =

Review comment:
       It is one of the headaches in our legacy code that we tend to pass a huge object to the private method and let it fetch whatever it needs. This is in general against the good design pattern. Can we refine it here so that the following private method only gets whatever parameters it wants but nothing in addition?
   
   I know it will make the parameter list longer. But I strongly prefer setting up a very clear parameter list. This leads to a good module design.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {

Review comment:
       There are 2 different works done here. Maybe splitting into 2 methods is a better plan.
   
   Also, another thing that is minor but I really dislike is that the 2 output parameter names are not symmetric... One is ***MapnOut another one is just ***Out. Why is that?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);
+    propagateCountsTopDown(stateModelDef, currentStateCountsOut);
+  }
+
+  void calculateExistingAndCurrentStateCount(Map<Partition, List<Message>> selectedResourceMessages,
+      CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState, ResourceControllerDataProvider cache,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    String resourceName = idealState.getResourceName();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+      Map<String, Integer> expectedStateCounts = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();
+      getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, idealState, cache,
+          currentStateMap, expectedStateCounts, currentStateCounts);
+
+      // save these two maps for later usage
+      expectedStateCountByPartition.put(partition, expectedStateCounts);
+      currentStateCountsByPartition.put(partition, currentStateCounts);
+    }
+  }
+
+  /*
+   * Classify the messages of each partition into recovery and load messages.
+   */
+  private void classifyMessages(String resourceName, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<Partition, List<Message>> selectedResourceMessages,
+      List<Message> recoveryMessages, List<Message> loadMessages,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    LogUtil.logInfo(logger, _eventId,
+        String.format("Classify message for resource: %s", resourceName));
+
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, Integer> expectedStateCountMap = expectedStateCountByPartition.get(partition);
+      Map<String, Integer> currentStateCounts = currentStateCountsByPartition.get(partition);
+
+      List<Message> partitionMessages = selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+
+      String stateModelDefName = idealState.getStateModelDefRef();
+      StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+      // sort partitionMessages based on transition priority and then creation timestamp for transition message
+      // TODO: sort messages in same partition in next PR
+      Set<String> disabledInstances =
+          cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName());
+      for (Message msg : partitionMessages) {
+        if (!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("Message: %s not subject to throttle in resource: %s with type %s", msg,
+                    resourceName, msg.getMsgType()));
+          }
+          continue;
+        }
+
+        boolean isUpward = !isDownwardTransition(idealState, cache, msg);
+
+        // for disabled disabled instance, the downward transition is not subjected to load throttling
+        // we will let them pass through ASAP.
+        String instance = msg.getTgtName();
+        if (disabledInstances.contains(instance)) {
+          if (!isUpward) {
+            if (logger.isDebugEnabled()) {
+              LogUtil.logDebug(logger, _eventId, String.format(
+                  "Message: %s not subject to throttle in resource: %s to disabled instancce %s",
+                  msg, resourceName, instance));
+            }
+            continue;
+          }
+        }
+
+        String toState = msg.getToState();
+        if (toState.equals(HelixDefinedState.DROPPED.name()) || toState
+            .equals(HelixDefinedState.ERROR.name())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("Message: %s not subject to throttle in resource: %s with toState %s", msg,
+                    resourceName, toState));
+          }
+          continue;
+        }
+
+        Integer expectedCount = expectedStateCountMap.get(toState);
+        Integer currentCount = currentStateCounts.get(toState);
+        expectedCount = expectedCount == null ? 0 : expectedCount;
+        currentCount = currentCount == null ? 0 : currentCount;
+
+        if (isUpward && (currentCount < expectedCount)) {
+          recoveryMessages.add(msg);
+          currentStateCounts.put(toState, currentCount + 1);

Review comment:
       Why do we need to change currentStateCounts because of the classifying?
   
   I mean I know the reason. But please add a comment here.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);
+    propagateCountsTopDown(stateModelDef, currentStateCountsOut);
+  }
+
+  void calculateExistingAndCurrentStateCount(Map<Partition, List<Message>> selectedResourceMessages,
+      CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState, ResourceControllerDataProvider cache,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    String resourceName = idealState.getResourceName();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+      Map<String, Integer> expectedStateCounts = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();
+      getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, idealState, cache,
+          currentStateMap, expectedStateCounts, currentStateCounts);
+
+      // save these two maps for later usage
+      expectedStateCountByPartition.put(partition, expectedStateCounts);
+      currentStateCountsByPartition.put(partition, currentStateCounts);
+    }
+  }
+
+  /*
+   * Classify the messages of each partition into recovery and load messages.
+   */
+  private void classifyMessages(String resourceName, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<Partition, List<Message>> selectedResourceMessages,
+      List<Message> recoveryMessages, List<Message> loadMessages,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    LogUtil.logInfo(logger, _eventId,
+        String.format("Classify message for resource: %s", resourceName));
+
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, Integer> expectedStateCountMap = expectedStateCountByPartition.get(partition);
+      Map<String, Integer> currentStateCounts = currentStateCountsByPartition.get(partition);
+
+      List<Message> partitionMessages = selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+
+      String stateModelDefName = idealState.getStateModelDefRef();
+      StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+      // sort partitionMessages based on transition priority and then creation timestamp for transition message
+      // TODO: sort messages in same partition in next PR
+      Set<String> disabledInstances =
+          cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName());
+      for (Message msg : partitionMessages) {
+        if (!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("Message: %s not subject to throttle in resource: %s with type %s", msg,
+                    resourceName, msg.getMsgType()));
+          }
+          continue;
+        }
+
+        boolean isUpward = !isDownwardTransition(idealState, cache, msg);
+
+        // for disabled disabled instance, the downward transition is not subjected to load throttling

Review comment:
       nit, but this kind of comment really prevents the reviewers to understand your intention. Please revisit and fix here and if more comments like this.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);

Review comment:
       Will this work?
   
   >     LinkedHashMap<String, Integer> expectedStateCountMap =
   >         stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);
+    propagateCountsTopDown(stateModelDef, currentStateCountsOut);
+  }
+
+  void calculateExistingAndCurrentStateCount(Map<Partition, List<Message>> selectedResourceMessages,
+      CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState, ResourceControllerDataProvider cache,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    String resourceName = idealState.getResourceName();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+      Map<String, Integer> expectedStateCounts = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();
+      getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, idealState, cache,
+          currentStateMap, expectedStateCounts, currentStateCounts);
+
+      // save these two maps for later usage
+      expectedStateCountByPartition.put(partition, expectedStateCounts);
+      currentStateCountsByPartition.put(partition, currentStateCounts);
+    }
+  }
+
+  /*
+   * Classify the messages of each partition into recovery and load messages.
+   */
+  private void classifyMessages(String resourceName, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<Partition, List<Message>> selectedResourceMessages,
+      List<Message> recoveryMessages, List<Message> loadMessages,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    LogUtil.logInfo(logger, _eventId,
+        String.format("Classify message for resource: %s", resourceName));
+
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, Integer> expectedStateCountMap = expectedStateCountByPartition.get(partition);
+      Map<String, Integer> currentStateCounts = currentStateCountsByPartition.get(partition);
+
+      List<Message> partitionMessages = selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+
+      String stateModelDefName = idealState.getStateModelDefRef();
+      StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+      // sort partitionMessages based on transition priority and then creation timestamp for transition message
+      // TODO: sort messages in same partition in next PR
+      Set<String> disabledInstances =
+          cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName());
+      for (Message msg : partitionMessages) {
+        if (!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("Message: %s not subject to throttle in resource: %s with type %s", msg,
+                    resourceName, msg.getMsgType()));
+          }
+          continue;
+        }
+
+        boolean isUpward = !isDownwardTransition(idealState, cache, msg);
+
+        // for disabled disabled instance, the downward transition is not subjected to load throttling
+        // we will let them pass through ASAP.
+        String instance = msg.getTgtName();
+        if (disabledInstances.contains(instance)) {
+          if (!isUpward) {
+            if (logger.isDebugEnabled()) {
+              LogUtil.logDebug(logger, _eventId, String.format(
+                  "Message: %s not subject to throttle in resource: %s to disabled instancce %s",
+                  msg, resourceName, instance));
+            }
+            continue;
+          }
+        }
+
+        String toState = msg.getToState();
+        if (toState.equals(HelixDefinedState.DROPPED.name()) || toState
+            .equals(HelixDefinedState.ERROR.name())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("Message: %s not subject to throttle in resource: %s with toState %s", msg,
+                    resourceName, toState));
+          }
+          continue;
+        }
+
+        Integer expectedCount = expectedStateCountMap.get(toState);
+        Integer currentCount = currentStateCounts.get(toState);
+        expectedCount = expectedCount == null ? 0 : expectedCount;
+        currentCount = currentCount == null ? 0 : currentCount;
+
+        if (isUpward && (currentCount < expectedCount)) {

Review comment:
       The first sentence of the related issue is, "MIN_ACTIVE replica is only applied in DelayAutoRebalancer and not respected by throttling logic in Helix." So I think is logic is not what we want eventually. Are you planning to add it in a later PR? If so, then let's add a TODO here.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {

Review comment:
       It is also used for the current state count map, right?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,

Review comment:
       Sorry that I don't really understand what is the functionality of this method. Could you please add more comments?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);
+    propagateCountsTopDown(stateModelDef, currentStateCountsOut);

Review comment:
       Why we need to understand the state model for the current state counts?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);
+    propagateCountsTopDown(stateModelDef, currentStateCountsOut);
+  }
+
+  void calculateExistingAndCurrentStateCount(Map<Partition, List<Message>> selectedResourceMessages,
+      CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState, ResourceControllerDataProvider cache,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    String resourceName = idealState.getResourceName();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+      Map<String, Integer> expectedStateCounts = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();
+      getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, idealState, cache,
+          currentStateMap, expectedStateCounts, currentStateCounts);
+
+      // save these two maps for later usage
+      expectedStateCountByPartition.put(partition, expectedStateCounts);
+      currentStateCountsByPartition.put(partition, currentStateCounts);
+    }
+  }
+
+  /*
+   * Classify the messages of each partition into recovery and load messages.
+   */
+  private void classifyMessages(String resourceName, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<Partition, List<Message>> selectedResourceMessages,
+      List<Message> recoveryMessages, List<Message> loadMessages,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    LogUtil.logInfo(logger, _eventId,
+        String.format("Classify message for resource: %s", resourceName));
+
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, Integer> expectedStateCountMap = expectedStateCountByPartition.get(partition);
+      Map<String, Integer> currentStateCounts = currentStateCountsByPartition.get(partition);
+
+      List<Message> partitionMessages = selectedResourceMessages.get(partition);
+      if (partitionMessages == null) {
+        continue;
+      }
+
+      String stateModelDefName = idealState.getStateModelDefRef();
+      StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+      // sort partitionMessages based on transition priority and then creation timestamp for transition message
+      // TODO: sort messages in same partition in next PR
+      Set<String> disabledInstances =
+          cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName());
+      for (Message msg : partitionMessages) {
+        if (!Message.MessageType.STATE_TRANSITION.name().equals(msg.getMsgType())) {
+          if (logger.isDebugEnabled()) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("Message: %s not subject to throttle in resource: %s with type %s", msg,
+                    resourceName, msg.getMsgType()));
+          }
+          continue;
+        }
+
+        boolean isUpward = !isDownwardTransition(idealState, cache, msg);

Review comment:
       Hasn't we decided to put only "basic throttle" logic in this PR? The DownwardTransition shall be put in a later PR, right?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);
+    propagateCountsTopDown(stateModelDef, currentStateCountsOut);
+  }
+
+  void calculateExistingAndCurrentStateCount(Map<Partition, List<Message>> selectedResourceMessages,
+      CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState, ResourceControllerDataProvider cache,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    String resourceName = idealState.getResourceName();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+      Map<String, Integer> expectedStateCounts = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();

Review comment:
       I guess you want to use an ordered map here? Or what's the point of referring to the state model and read state priority in the later logic?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/stages/PerReplicaThrottleStage.java
##########
@@ -220,6 +271,224 @@ private MessageOutput throttlePerReplicaMessages(IdealState idealState,
     return output;
   }
 
+  private void propagateCountsTopDown(StateModelDefinition stateModelDef,
+      Map<String, Integer> expectedStateCountMap) {
+    // attribute state in higher priority to lower priority
+    List<String> stateList = stateModelDef.getStatesPriorityList();
+    if (stateList.size() <= 0) {
+      return;
+    }
+    int index = 0;
+    String prevState = stateList.get(index);
+    if (!expectedStateCountMap.containsKey(prevState)) {
+      expectedStateCountMap.put(prevState, 0);
+    }
+    while (true) {
+      if (index == stateList.size() - 1) {
+        break;
+      }
+      index++;
+      String curState = stateList.get(index);
+      String num = stateModelDef.getNumInstancesPerState(curState);
+      if ("-1".equals(num)) {
+        break;
+      }
+      Integer prevCnt = expectedStateCountMap.get(prevState);
+      expectedStateCountMap
+          .put(curState, prevCnt + expectedStateCountMap.getOrDefault(curState, 0));
+      prevState = curState;
+    }
+  }
+
+  private void getPartitionExpectedAndCurrentStateCountMap(Partition partition,
+      Map<String, List<String>> preferenceLists, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<String, String> currentStateMap,
+      Map<String, Integer> expectedStateCountMapOut, Map<String, Integer> currentStateCountsOut) {
+    List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
+    if (preferenceList == null) {
+      preferenceList = Collections.emptyList();
+    }
+
+    int replica =
+        idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
+            : idealState.getMinActiveReplicas();
+    Set<String> activeList = new HashSet<>(preferenceList);
+    activeList.retainAll(cache.getEnabledLiveInstances());
+
+    // For each state, check that this partition currently has the required number of that state as
+    // required by StateModelDefinition.
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+    LinkedHashMap<String, Integer> expectedStateCountMap =
+        stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts
+
+    // Current counts without disabled partitions or disabled instances
+    Map<String, String> currentStateMapWithoutDisabled = new HashMap<>(currentStateMap);
+    currentStateMapWithoutDisabled.keySet().removeAll(cache
+        .getDisabledInstancesForPartition(idealState.getResourceName(),
+            partition.getPartitionName()));
+    Map<String, Integer> currentStateCounts =
+        StateModelDefinition.getStateCounts(currentStateMapWithoutDisabled);
+
+    expectedStateCountMapOut.putAll(expectedStateCountMap);
+    currentStateCountsOut.putAll(currentStateCounts);
+    propagateCountsTopDown(stateModelDef, expectedStateCountMapOut);
+    propagateCountsTopDown(stateModelDef, currentStateCountsOut);
+  }
+
+  void calculateExistingAndCurrentStateCount(Map<Partition, List<Message>> selectedResourceMessages,
+      CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput,
+      IdealState idealState, ResourceControllerDataProvider cache,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    String resourceName = idealState.getResourceName();
+    Map<String, List<String>> preferenceLists =
+        bestPossibleStateOutput.getPreferenceLists(resourceName);
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+      Map<String, Integer> expectedStateCounts = new HashMap<>();
+      Map<String, Integer> currentStateCounts = new HashMap<>();
+      getPartitionExpectedAndCurrentStateCountMap(partition, preferenceLists, idealState, cache,
+          currentStateMap, expectedStateCounts, currentStateCounts);
+
+      // save these two maps for later usage
+      expectedStateCountByPartition.put(partition, expectedStateCounts);
+      currentStateCountsByPartition.put(partition, currentStateCounts);
+    }
+  }
+
+  /*
+   * Classify the messages of each partition into recovery and load messages.
+   */
+  private void classifyMessages(String resourceName, IdealState idealState,
+      ResourceControllerDataProvider cache, Map<Partition, List<Message>> selectedResourceMessages,
+      List<Message> recoveryMessages, List<Message> loadMessages,
+      Map<Partition, Map<String, Integer>> expectedStateCountByPartition,
+      Map<Partition, Map<String, Integer>> currentStateCountsByPartition) {
+    LogUtil.logInfo(logger, _eventId,
+        String.format("Classify message for resource: %s", resourceName));
+
+    for (Partition partition : selectedResourceMessages.keySet()) {
+      Map<String, Integer> expectedStateCountMap = expectedStateCountByPartition.get(partition);
+      Map<String, Integer> currentStateCounts = currentStateCountsByPartition.get(partition);

Review comment:
       The names of these vars seem to be set reversely.
   The current expectedStateCountMap is really expectedStateCountByPartition, the current expectedStateCountByPartition is really the whole count map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org