You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:44:47 UTC
[helix] 06/07: Add test cases for replica level throttling (#1754)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit fbd901a16d46cb9bf90b558f6e14972432c2793e
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Wed May 26 12:57:08 2021 -0700
Add test cases for replica level throttling (#1754)
This commit contains the test skeleton and multiple test cases for:
1. Sinlge top state throttling/non-throttling
2. Multi top states throttling/non-throttling
---
.../stages/TestReplicaLevelThrottling.java | 263 +++++++++++++++++++
.../TestReplicaLevelThrottling.MultiTopStates.json | 281 +++++++++++++++++++++
.../TestReplicaLevelThrottling.SingleTopState.json | 280 ++++++++++++++++++++
3 files changed, 824 insertions(+)
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
new file mode 100644
index 0000000..0c1a705
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -0,0 +1,263 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.builder.FullAutoModeISBuilder;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestReplicaLevelThrottling extends BaseStageTest {
+ static final String CLUSTER_NAME = "TestCluster";
+ static final String RESOURCE_NAME = "TestResource";
+ static final String NOT_SET = "-1";
+
+ @Test(dataProvider = "replicaLevelThrottlingInput")
+ public void testPerReplicaThrottling(ClusterEvent event, Map<String, Map<String, String>> expectedOutput,
+ Map<String, Object> cacheMap, Mock mock) {
+ prepareCache(cacheMap, mock);
+ runStage(event, new IntermediateStateCalcStage());
+ Assert.assertTrue(matches(event, expectedOutput));
+ }
+
+ // Prepare the cache since it is well encapsulated, there is no way to set the cache things.
+ // Also if we move this piece in data loading, the temporary created mock object and ClusterConfig will be overrided
+ // by following test cases data.
+ private void prepareCache(Map<String, Object> cacheMap, Mock mock) {
+ when(mock.cache.getClusterConfig()).thenReturn((ClusterConfig) cacheMap.get(CacheKeys.clusterConfig.name()));
+ when(mock.cache.getStateModelDef((String) cacheMap.get(CacheKeys.stateModelName.name()))).thenReturn(
+ (StateModelDefinition) cacheMap.get(CacheKeys.stateModelDef.name()));
+ when(mock.cache.getEnabledLiveInstances()).thenReturn(new HashSet<>(
+ ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()));
+ when(mock.cache.getLiveInstances()).thenReturn(new HashSet<>(
+ ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()).stream()
+ .collect(Collectors.toMap(e -> e, e -> new LiveInstance(e))));
+ when(mock.cache.getIdealState(RESOURCE_NAME)).thenReturn(
+ new FullAutoModeISBuilder(RESOURCE_NAME).setMinActiveReplica(
+ (Integer) cacheMap.get(CacheKeys.minActiveReplica.name()))
+ .setNumReplica((Integer) cacheMap.get(CacheKeys.numReplica.name()))
+ .setStateModel((String) cacheMap.get(CacheKeys.stateModelName.name()))
+ .setNumPartitions(2)
+ .setRebalancerMode(IdealState.RebalanceMode.FULL_AUTO)
+ .build());
+ }
+
+ private boolean matches(ClusterEvent event, Map<String, Map<String, String>> expectedOutPut) {
+ Map<Partition, Map<String, String>> intermediateResult =
+ ((IntermediateStateOutput) event.getAttribute(AttributeName.INTERMEDIATE_STATE.name())).getPartitionStateMap(
+ RESOURCE_NAME).getStateMap();
+ for (Partition partition : intermediateResult.keySet()) {
+ if (!expectedOutPut.containsKey(partition.getPartitionName()) || !expectedOutPut.get(partition.getPartitionName())
+ .equals(intermediateResult.get(partition))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @DataProvider(name = "replicaLevelThrottlingInput")
+ public Object[][] rebalanceStrategies() {
+ List<Object[]> data = new ArrayList<>();
+ data.addAll(loadTestInputs("TestReplicaLevelThrottling.SingleTopState.json"));
+ data.addAll(loadTestInputs("TestReplicaLevelThrottling.MultiTopStates.json"));
+
+ Object[][] ret = new Object[data.size()][];
+ for (int i = 0; i < data.size(); i++) {
+ ret[i] = data.get(i);
+ }
+ return ret;
+ }
+
+ enum Entry {
+ stateModel,
+ numReplica,
+ minActiveReplica,
+ testCases,
+
+ // Per Test
+ partitionNames,
+ messageOutput, // instance -> target state of message
+ bestPossible,
+ preferenceList,
+ clusterThrottleLoad,
+ resourceThrottleLoad,
+ instanceThrottleLoad,
+ instanceThrottleRecovery,
+ currentStates,
+ pendingMessages,
+ expectedOutput
+ }
+
+ enum CacheKeys {
+ clusterConfig, stateModelName, stateModelDef, minActiveReplica, numReplica, preferenceList
+ }
+
+ public List<Object[]> loadTestInputs(String fileName) {
+ List<Object[]> ret = null;
+ InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName);
+ try {
+ ObjectReader mapReader = new ObjectMapper().reader(Map.class);
+ Map<String, Object> inputMaps = mapReader.readValue(inputStream);
+ String stateModelName = (String) inputMaps.get(Entry.stateModel.name());
+
+ StateModelDefinition stateModelDef =
+ BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition();
+ int minActiveReplica = Integer.parseInt((String) inputMaps.get(Entry.minActiveReplica.name()));
+ int numReplica = Integer.parseInt((String) inputMaps.get(Entry.numReplica.name()));
+ List<Map<String, Object>> inputs = (List<Map<String, Object>>) inputMaps.get(Entry.testCases.name());
+ ret = new ArrayList<>();
+ Mock mock = new Mock();
+ for (Map<String, Object> inMap : inputs) {
+ Resource resource = new Resource(RESOURCE_NAME);
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ MessageOutput messageOutput = new MessageOutput();
+ BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
+ Map<String, List<String>> preferenceLists = (Map<String, List<String>>) inMap.get(Entry.preferenceList.name());
+ Map<String, Map<String, String>> pendingMessages =
+ (Map<String, Map<String, String>>) inMap.get(Entry.pendingMessages.name());
+ Map<String, Map<String, String>> currentStates =
+ (Map<String, Map<String, String>>) inMap.get(Entry.currentStates.name());
+ Map<String, Map<String, String>> bestPossible =
+ (Map<String, Map<String, String>>) inMap.get(Entry.bestPossible.name());
+ Map<String, Map<String, String>> messageMap =
+ (Map<String, Map<String, String>>) inMap.get(Entry.messageOutput.name());
+ for (String partition : (List<String>) inMap.get(Entry.partitionNames.name())) {
+ resource.addPartition(partition);
+ bestPossibleStateOutput.setPreferenceList(RESOURCE_NAME, partition, preferenceLists.get(partition));
+ bestPossibleStateOutput.setState(RESOURCE_NAME, resource.getPartition(partition),
+ bestPossible.get(partition));
+ List<Message> messages = generateMessages(messageMap.get(partition), currentStates.get(partition));
+ messageOutput.addMessages(RESOURCE_NAME, resource.getPartition(partition), messages);
+ currentStates.get(partition)
+ .entrySet()
+ .forEach(
+ e -> currentStateOutput.setCurrentState(resource.getResourceName(), resource.getPartition(partition),
+ e.getKey(), e.getValue()));
+ generateMessages(pendingMessages.get(partition), currentStates.get(partition)).forEach(
+ m -> currentStateOutput.setPendingMessage(resource.getResourceName(), resource.getPartition(partition),
+ m.getTgtName(), m));
+ }
+ ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.Unknown);
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); // add current states
+ event.addAttribute(AttributeName.ControllerDataProvider.name(),
+ buildCache(mock, numReplica, minActiveReplica, stateModelDef, stateModelName, preferenceLists));
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageOutput);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ Collections.singletonMap(RESOURCE_NAME, resource));
+ Map<String, Map<String, String>> expectedOutput =
+ (Map<String, Map<String, String>>) inMap.get(Entry.expectedOutput.name());
+
+ // Build throttle configs
+ ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME);
+ List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
+ getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, Entry.clusterThrottleLoad.name(), throttleConfigs,
+ inMap);
+ getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.RESOURCE, Entry.resourceThrottleLoad.name(), throttleConfigs,
+ inMap);
+ getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE, Entry.instanceThrottleLoad.name(), throttleConfigs,
+ inMap);
+ getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE, Entry.instanceThrottleRecovery.name(),
+ throttleConfigs, inMap);
+
+ clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
+
+ Map<String, Object> cacheMap = new HashMap<>();
+ cacheMap.put(CacheKeys.clusterConfig.name(), clusterConfig);
+ cacheMap.put(CacheKeys.stateModelName.name(), stateModelName);
+ cacheMap.put(CacheKeys.stateModelDef.name(), stateModelDef);
+ cacheMap.put(CacheKeys.preferenceList.name(), preferenceLists);
+ cacheMap.put(CacheKeys.minActiveReplica.name(), minActiveReplica);
+ cacheMap.put(CacheKeys.numReplica.name(), numReplica);
+ ret.add(new Object[]{event, expectedOutput, cacheMap, mock});
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ return ret;
+ }
+
+ private List<Message> generateMessages(Map<String, String> messageMap, Map<String, String> currentStates) {
+ if (messageMap == null || currentStates == null) {
+ return Collections.emptyList();
+ }
+ List<Message> messages = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : messageMap.entrySet()) {
+ Message message = new Message(new ZNRecord(UUID.randomUUID().toString()));
+ message.setFromState(currentStates.get(entry.getKey()));
+ message.setToState(entry.getValue());
+ message.setTgtName(entry.getKey());
+ messages.add(message);
+ }
+
+ return messages;
+ }
+
+ private ResourceControllerDataProvider buildCache(Mock mock, int numReplica, int minActive,
+ StateModelDefinition stateModelDefinition, String stateModel, Map<String, List<String>> preferenceLists) {
+
+ return mock.cache;
+ }
+
+ private void getSingleThrottleEntry(StateTransitionThrottleConfig.RebalanceType rebalanceType,
+ StateTransitionThrottleConfig.ThrottleScope throttleScope, String entryName,
+ List<StateTransitionThrottleConfig> throttleConfigs, Map<String, Object> inMap) {
+ if (!inMap.get(entryName).equals(NOT_SET)) {
+ throttleConfigs.add(new StateTransitionThrottleConfig(rebalanceType, throttleScope,
+ Integer.valueOf((String) inMap.get(entryName))));
+ }
+ }
+
+ private final class Mock {
+ private ResourceControllerDataProvider cache = mock(ResourceControllerDataProvider.class);
+ }
+}
diff --git a/helix-core/src/test/resources/TestReplicaLevelThrottling.MultiTopStates.json b/helix-core/src/test/resources/TestReplicaLevelThrottling.MultiTopStates.json
new file mode 100644
index 0000000..35bdb6d
--- /dev/null
+++ b/helix-core/src/test/resources/TestReplicaLevelThrottling.MultiTopStates.json
@@ -0,0 +1,281 @@
+{
+ "stateModel": "OnlineOffline",
+ "minActiveReplica": "2",
+ "numReplica": "3",
+ "testCases": [
+ {
+ "description": "With pending message, still in the quota range, no throttling",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "-1",
+ "instanceThrottleLoad": "2",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "pendingMessages": {
+ "partition_1": {
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ }
+ },
+ {
+ "description": "With pending message, only on message allow per instance. Messages to 12915 should be throttled",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "-1",
+ "instanceThrottleLoad": "1",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "pendingMessages": {
+ "partition_1": {
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ }
+ },
+ {
+ "description": "Throttle happens at resource level with limit 1 and there is a topology change",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "2",
+ "instanceThrottleLoad": "10",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "OFFLINE",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE",
+ "localhost_12916": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "pendingMessages": {
+ "partition_1": {
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "OFFLINE",
+ "localhost_12916": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ }
+ },
+ {
+ "description": "Without pending message, for different priority of partitions, throttle results are different",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "2",
+ "instanceThrottleLoad": "10",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "OFFLINE",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE",
+ "localhost_12916": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE"
+ }
+ },
+ "pendingMessages": {
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "ONLINE",
+ "localhost_12916": "ONLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "ONLINE",
+ "localhost_12914": "ONLINE",
+ "localhost_12915": "OFFLINE"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
new file mode 100644
index 0000000..9706d16
--- /dev/null
+++ b/helix-core/src/test/resources/TestReplicaLevelThrottling.SingleTopState.json
@@ -0,0 +1,280 @@
+{
+ "stateModel": "LeaderStandby",
+ "minActiveReplica": "2",
+ "numReplica": "3",
+ "testCases": [
+ {
+ "description": "With pending message, still in the quota range, no throttling",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "-1",
+ "instanceThrottleLoad": "2",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "STANDBY",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "pendingMessages": {
+ "partition_1": {
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ }
+ },
+ {
+ "description": "With pending message, only on message allow per instance. Messages to 12915 should be throttled",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "-1",
+ "instanceThrottleLoad": "1",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "STANDBY",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "pendingMessages": {
+ "partition_1": {
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ }
+ },
+ {
+ "description": "Throttle happens at resource level with limit 1 and there is a topology change",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "1",
+ "instanceThrottleLoad": "10",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "STANDBY",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE",
+ "localhost_12916": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "pendingMessages": {
+ "partition_1": {
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE",
+ "localhost_12916": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ }
+ },
+ {
+ "description": "Without pending message, for different priority of partitions, throttle results are different",
+ "clusterThrottleLoad": "-1",
+ "resourceThrottleLoad": "1",
+ "instanceThrottleLoad": "10",
+ "instanceThrottleRecovery": "-1",
+ "partitionNames": ["partition_0", "partition_1"],
+ "messageOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "preferenceList": {
+ "partition_0": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ],
+ "partition_1": [
+ "localhost_12913",
+ "localhost_12914",
+ "localhost_12915"
+ ]
+ },
+ "currentStates": {
+ "partition_0": {
+ "localhost_12913": "STANDBY",
+ "localhost_12914": "OFFLINE",
+ "localhost_12915": "OFFLINE",
+ "localhost_12916": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ }
+ },
+ "bestPossible": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "STANDBY"
+ }
+ },
+ "pendingMessages": {
+ },
+ "expectedOutput": {
+ "partition_0": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE",
+ "localhost_12916": "STANDBY"
+ },
+ "partition_1": {
+ "localhost_12913": "LEADER",
+ "localhost_12914": "STANDBY",
+ "localhost_12915": "OFFLINE"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file