You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:44:47 UTC

[helix] 06/07: Add test cases for replica level throttling (#1754)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit fbd901a16d46cb9bf90b558f6e14972432c2793e
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed May 26 12:57:08 2021 -0700

    Add test cases for replica level throttling (#1754)
    
    This commit contains the test skeleton and multiple test cases for:
    1. Sinlge top state throttling/non-throttling
    2. Multi top states throttling/non-throttling
---
 .../stages/TestReplicaLevelThrottling.java         | 263 +++++++++++++++++++
 .../TestReplicaLevelThrottling.MultiTopStates.json | 281 +++++++++++++++++++++
 .../TestReplicaLevelThrottling.SingleTopState.json | 280 ++++++++++++++++++++
 3 files changed, 824 insertions(+)

diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
new file mode 100644
index 0000000..0c1a705
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -0,0 +1,263 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestReplicaLevelThrottling extends BaseStageTest {
+  static final String CLUSTER_NAME = "TestCluster";
+  static final String RESOURCE_NAME = "TestResource";
+  static final String NOT_SET = "-1";
+
+  @Test(dataProvider = "replicaLevelThrottlingInput")
+  public void testPerReplicaThrottling(ClusterEvent event, Map<String, Map<String, String>> expectedOutput,
+      Map<String, Object> cacheMap, Mock mock) {
+    prepareCache(cacheMap, mock);
+    runStage(event, new IntermediateStateCalcStage());
+    Assert.assertTrue(matches(event, expectedOutput));
+  }
+
+  // Prepare the cache since it is well encapsulated, there is no way to set the cache things.
+  // Also if we move this piece in data loading, the temporary created mock object and ClusterConfig will be overrided
+  // by following test cases data.
+  private void prepareCache(Map<String, Object> cacheMap, Mock mock) {
+    when(mock.cache.getClusterConfig()).thenReturn((ClusterConfig) cacheMap.get(CacheKeys.clusterConfig.name()));
+    when(mock.cache.getStateModelDef((String) cacheMap.get(CacheKeys.stateModelName.name()))).thenReturn(
+        (StateModelDefinition) cacheMap.get(CacheKeys.stateModelDef.name()));
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(new HashSet<>(
+        ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()));
+    when(mock.cache.getLiveInstances()).thenReturn(new HashSet<>(
+        ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()).stream()
+        .collect(Collectors.toMap(e -> e, e -> new LiveInstance(e))));
+    when(mock.cache.getIdealState(RESOURCE_NAME)).thenReturn(
+        new FullAutoModeISBuilder(RESOURCE_NAME).setMinActiveReplica(
+            (Integer) cacheMap.get(CacheKeys.minActiveReplica.name()))
+            .setNumReplica((Integer) cacheMap.get(CacheKeys.numReplica.name()))
+            .setStateModel((String) cacheMap.get(CacheKeys.stateModelName.name()))
+            .setNumPartitions(2)
+            .setRebalancerMode(IdealState.RebalanceMode.FULL_AUTO)
+            .build());
+  }
+
+  private boolean matches(ClusterEvent event, Map<String, Map<String, String>> expectedOutPut) {
+    Map<Partition, Map<String, String>> intermediateResult =
+        ((IntermediateStateOutput) event.getAttribute(AttributeName.INTERMEDIATE_STATE.name())).getPartitionStateMap(
+            RESOURCE_NAME).getStateMap();
+    for (Partition partition : intermediateResult.keySet()) {
+      if (!expectedOutPut.containsKey(partition.getPartitionName()) || !expectedOutPut.get(partition.getPartitionName())
+          .equals(intermediateResult.get(partition))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @DataProvider(name = "replicaLevelThrottlingInput")
+  public Object[][] rebalanceStrategies() {
+    List<Object[]> data = new ArrayList<>();
+    data.addAll(loadTestInputs("TestReplicaLevelThrottling.SingleTopState.json"));
+    data.addAll(loadTestInputs("TestReplicaLevelThrottling.MultiTopStates.json"));
+
+    Object[][] ret = new Object[data.size()][];
+    for (int i = 0; i < data.size(); i++) {
+      ret[i] = data.get(i);
+    }
+    return ret;
+  }
+
+  enum Entry {
+    stateModel,
+    numReplica,
+    minActiveReplica,
+    testCases,
+
+    // Per Test
+    partitionNames,
+    messageOutput,           // instance -> target state of message
+    bestPossible,
+    preferenceList,
+    clusterThrottleLoad,
+    resourceThrottleLoad,
+    instanceThrottleLoad,
+    instanceThrottleRecovery,
+    currentStates,
+    pendingMessages,
+    expectedOutput
+  }
+
+  enum CacheKeys {
+    clusterConfig, stateModelName, stateModelDef, minActiveReplica, numReplica, preferenceList
+  }
+
+  public List<Object[]> loadTestInputs(String fileName) {
+    List<Object[]> ret = null;
+    InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName);
+    try {
+      ObjectReader mapReader = new ObjectMapper().reader(Map.class);
+      Map<String, Object> inputMaps = mapReader.readValue(inputStream);
+      String stateModelName = (String) inputMaps.get(Entry.stateModel.name());
+
+      StateModelDefinition stateModelDef =
+          BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition();
+      int minActiveReplica = Integer.parseInt((String) inputMaps.get(Entry.minActiveReplica.name()));
+      int numReplica = Integer.parseInt((String) inputMaps.get(Entry.numReplica.name()));
+      List<Map<String, Object>> inputs = (List<Map<String, Object>>) inputMaps.get(Entry.testCases.name());
+      ret = new ArrayList<>();
+      Mock mock = new Mock();
+      for (Map<String, Object> inMap : inputs) {
+        Resource resource = new Resource(RESOURCE_NAME);
+        CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+        MessageOutput messageOutput = new MessageOutput();
+        BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+        Map<String, List<String>> preferenceLists = (Map<String, List<String>>) inMap.get(Entry.preferenceList.name());
+        Map<String, Map<String, String>> pendingMessages =
+            (Map<String, Map<String, String>>) inMap.get(Entry.pendingMessages.name());
+        Map<String, Map<String, String>> currentStates =
+            (Map<String, Map<String, String>>) inMap.get(Entry.currentStates.name());
+        Map<String, Map<String, String>> bestPossible =
+            (Map<String, Map<String, String>>) inMap.get(Entry.bestPossible.name());
+        Map<String, Map<String, String>> messageMap =
+            (Map<String, Map<String, String>>) inMap.get(Entry.messageOutput.name());
+        for (String partition : (List<String>) inMap.get(Entry.partitionNames.name())) {
+          resource.addPartition(partition);
+          bestPossibleStateOutput.setPreferenceList(RESOURCE_NAME, partition, preferenceLists.get(partition));
+          bestPossibleStateOutput.setState(RESOURCE_NAME, resource.getPartition(partition),
+              bestPossible.get(partition));
+          List<Message> messages = generateMessages(messageMap.get(partition), currentStates.get(partition));
+          messageOutput.addMessages(RESOURCE_NAME, resource.getPartition(partition), messages);
+          currentStates.get(partition)
+              .entrySet()
+              .forEach(
+                  e -> currentStateOutput.setCurrentState(resource.getResourceName(), resource.getPartition(partition),
+                      e.getKey(), e.getValue()));
+          generateMessages(pendingMessages.get(partition), currentStates.get(partition)).forEach(
+              m -> currentStateOutput.setPendingMessage(resource.getResourceName(), resource.getPartition(partition),
+                  m.getTgtName(), m));
+        }
+        ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.Unknown);
+        event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); // add current states
+        event.addAttribute(AttributeName.ControllerDataProvider.name(),
+            buildCache(mock, numReplica, minActiveReplica, stateModelDef, stateModelName, preferenceLists));
+        event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageOutput);
+        event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+        event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+            Collections.singletonMap(RESOURCE_NAME, resource));
+        Map<String, Map<String, String>> expectedOutput =
+            (Map<String, Map<String, String>>) inMap.get(Entry.expectedOutput.name());
+
+        // Build throttle configs
+        ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME);
+        List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
+        getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.CLUSTER, Entry.clusterThrottleLoad.name(), throttleConfigs,
+            inMap);
+        getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.RESOURCE, Entry.resourceThrottleLoad.name(), throttleConfigs,
+            inMap);
+        getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, Entry.instanceThrottleLoad.name(), throttleConfigs,
+            inMap);
+        getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+            StateTransitionThrottleConfig.ThrottleScope.INSTANCE, Entry.instanceThrottleRecovery.name(),
+            throttleConfigs, inMap);
+
+        clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+
+        Map<String, Object> cacheMap = new HashMap<>();
+        cacheMap.put(CacheKeys.clusterConfig.name(), clusterConfig);
+        cacheMap.put(CacheKeys.stateModelName.name(), stateModelName);
+        cacheMap.put(CacheKeys.stateModelDef.name(), stateModelDef);
+        cacheMap.put(CacheKeys.preferenceList.name(), preferenceLists);
+        cacheMap.put(CacheKeys.minActiveReplica.name(), minActiveReplica);
+        cacheMap.put(CacheKeys.numReplica.name(), numReplica);
+        ret.add(new Object[]{event, expectedOutput, cacheMap, mock});
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    return ret;
+  }
+
+  private List<Message> generateMessages(Map<String, String> messageMap, Map<String, String> currentStates) {
+    if (messageMap == null || currentStates == null) {
+      return Collections.emptyList();
+    }
+    List<Message> messages = new ArrayList<>();
+
+    for (Map.Entry<String, String> entry : messageMap.entrySet()) {
+      Message message = new Message(new ZNRecord(UUID.randomUUID().toString()));
+      message.setFromState(currentStates.get(entry.getKey()));
+      message.setToState(entry.getValue());
+      message.setTgtName(entry.getKey());
+      messages.add(message);
+    }
+
+    return messages;
+  }
+
+  private ResourceControllerDataProvider buildCache(Mock mock, int numReplica, int minActive,
+      StateModelDefinition stateModelDefinition, String stateModel, Map<String, List<String>> preferenceLists) {
+
+    return mock.cache;
+  }
+
+  private void getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType rebalanceType,
+      StateTransitionThrottleConfig.ThrottleScope throttleScope, String entryName,
+      List<StateTransitionThrottleConfig> throttleConfigs, Map<String, Object> inMap) {
+    if (!inMap.get(entryName).equals(NOT_SET)) {
+      throttleConfigs.add(new StateTransitionThrottleConfig(rebalanceType, throttleScope,
+          Integer.valueOf((String) inMap.get(entryName))));
+    }
+  }
+
+  private final class Mock {
+    private ResourceControllerDataProvider cache = mock(ResourceControllerDataProvider.class);
+  }
+}
diff --git a/helix-core/src/test/resources/TestReplicaLevelThrottling.MultiTopStates.json b/helix-core/src/test/resources/TestReplicaLevelThrottling.MultiTopStates.json
new file mode 100644
index 0000000..35bdb6d
--- /dev/null
+++ b/helix-core/src/test/resources/TestReplicaLevelThrottling.MultiTopStates.json
@@ -0,0 +1,281 @@
+{
+  "stateModel": "OnlineOffline",
+  "minActiveReplica": "2",
+  "numReplica": "3",
+  "testCases": [
+    {
+      "description": "With pending message, still in the quota range, no throttling",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "-1",
+      "instanceThrottleLoad": "2",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "pendingMessages": {
+        "partition_1": {
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      }
+    },
+    {
+      "description": "With pending message, only on message allow per instance. Messages to 12915 should be throttled",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "-1",
+      "instanceThrottleLoad": "1",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "pendingMessages": {
+        "partition_1": {
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      }
+    },
+    {
+      "description": "Throttle happens at resource level with limit 1 and there is a topology change",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "2",
+      "instanceThrottleLoad": "10",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "OFFLINE",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE",
+          "localhost_12916": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "pendingMessages": {
+        "partition_1": {
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "OFFLINE",
+          "localhost_12916": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      }
+    },
+    {
+      "description": "Without pending message, for different priority of partitions, throttle results are different",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "2",
+      "instanceThrottleLoad": "10",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "OFFLINE",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE",
+          "localhost_12916": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE"
+        }
+      },
+      "pendingMessages": {
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "ONLINE",
+          "localhost_12916": "ONLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "ONLINE",
+          "localhost_12914": "ONLINE",
+          "localhost_12915": "OFFLINE"
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
new file mode 100644
index 0000000..9706d16
--- /dev/null
+++ b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
@@ -0,0 +1,280 @@
+{
+  "stateModel": "LeaderStandby",
+  "minActiveReplica": "2",
+  "numReplica": "3",
+  "testCases": [
+    {
+      "description": "With pending message, still in the quota range, no throttling",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "-1",
+      "instanceThrottleLoad": "2",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "STANDBY",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "pendingMessages": {
+        "partition_1": {
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      }
+    },
+    {
+      "description": "With pending message, only on message allow per instance. Messages to 12915 should be throttled",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "-1",
+      "instanceThrottleLoad": "1",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "STANDBY",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "pendingMessages": {
+        "partition_1": {
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      }
+    },
+    {
+      "description": "Throttle happens at resource level with limit 1 and there is a topology change",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "1",
+      "instanceThrottleLoad": "10",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "STANDBY",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE",
+          "localhost_12916": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "pendingMessages": {
+        "partition_1": {
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE",
+          "localhost_12916": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      }
+    },
+    {
+      "description": "Without pending message, for different priority of partitions, throttle results are different",
+      "clusterThrottleLoad": "-1",
+      "resourceThrottleLoad": "1",
+      "instanceThrottleLoad": "10",
+      "instanceThrottleRecovery": "-1",
+      "partitionNames": ["partition_0", "partition_1"],
+      "messageOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "preferenceList": {
+        "partition_0": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ],
+        "partition_1": [
+          "localhost_12913",
+          "localhost_12914",
+          "localhost_12915"
+        ]
+      },
+      "currentStates": {
+        "partition_0": {
+          "localhost_12913": "STANDBY",
+          "localhost_12914": "OFFLINE",
+          "localhost_12915": "OFFLINE",
+          "localhost_12916": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        }
+      },
+      "bestPossible": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "STANDBY"
+        }
+      },
+      "pendingMessages": {
+      },
+      "expectedOutput": {
+        "partition_0": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE",
+          "localhost_12916": "STANDBY"
+        },
+        "partition_1": {
+          "localhost_12913": "LEADER",
+          "localhost_12914": "STANDBY",
+          "localhost_12915": "OFFLINE"
+        }
+      }
+    }
+  ]
+}
\ No newline at end of file