You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2019/11/06 21:34:08 UTC
[helix] branch master updated: Fix partitions double charged for
pending message
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 554dcd0 Fix partitions double charged for pending message
554dcd0 is described below
commit 554dcd0e328f96462c4712019be7513d9166b4ee
Author: Junkai Xue <jx...@linkedin.com>
AuthorDate: Fri Nov 1 17:00:25 2019 -0700
Fix partitions double charged for pending message
Current Helix will charge the partitions with pending message twice due to not remove the partition from partitions need recovery/load balance set when they are charged for pending message.
This fix will fix the problem for recovery/load/ANY rebalance type charged for pending messages.
---
.../stages/IntermediateStateCalcStage.java | 45 ++++++++++++-
.../stages/TestIntermediateStateCalcStage.java | 77 ++++++++++++----------
2 files changed, 85 insertions(+), 37 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index da31b37..37df042 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -363,7 +363,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
}
chargePendingTransition(resource, currentStateOutput, throttleController,
- partitionsNeedRecovery, partitionsNeedLoadBalance, cache);
+ partitionsNeedRecovery, partitionsNeedLoadBalance, cache,
+ bestPossiblePartitionStateMap, intermediatePartitionStateMap);
// Perform recovery balance
Set<Partition> recoveryThrottledPartitions =
@@ -461,7 +462,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
*/
private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput,
StateTransitionThrottleController throttleController, Set<Partition> partitionsNeedRecovery,
- Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache) {
+ Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider cache,
+ PartitionStateMap bestPossiblePartitionStateMap,
+ PartitionStateMap intermediatePartitionStateMap) {
String resourceName = resource.getResourceName();
// check and charge pending transitions
@@ -487,10 +490,20 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
String pendingState = pendingMap.get(instance);
if (pendingState != null && !pendingState.equals(currentState)
&& !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName())
- .contains(instance)) {
+ .contains(instance)) {
// Only charge this instance if the partition is not disabled
throttleController.chargeInstance(rebalanceType, instance);
shouldChargePartition = true;
+ // If there is a pending state transition for the partition, that means that an assignment
+ // has already been made and the state transition message has already been sent out for the partition
+ // in a previous pipeline run. We must honor this and reflect it by charging for the pending state transition message.
+
+ // Since the assignment has already been made for the pending message, we do a special treatment
+ // for it by setting the best possible state directly in intermediatePartitionStateMap so that the pending
+ // message won't be double-assigned or double-charged in recovery or load balance.
+ handlePendingStateTransitionsForThrottling(partition, partitionsNeedRecovery,
+ partitionsNeedLoadbalance, rebalanceType, bestPossiblePartitionStateMap,
+ intermediatePartitionStateMap);
}
}
if (shouldChargePartition) {
@@ -945,4 +958,30 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
return matchedState;
}
}
+
+ /**
+ * Handle a partition with a pending message so that the partition will not be double-charged or double-assigned during recovery and load balance.
+ * @param partition
+ * @param partitionsNeedRecovery
+ * @param partitionsNeedLoadbalance
+ * @param rebalanceType
+ */
+ private void handlePendingStateTransitionsForThrottling(Partition partition,
+ Set<Partition> partitionsNeedRecovery, Set<Partition> partitionsNeedLoadbalance,
+ RebalanceType rebalanceType, PartitionStateMap bestPossiblePartitionStateMap,
+ PartitionStateMap intermediatePartitionStateMap) {
+ // Pass the best possible state directly into intermediatePartitionStateMap
+ // This is safe to do so because we already have a pending transition for this partition, implying that the assignment has been made in previous pipeline
+ intermediatePartitionStateMap
+ .setState(partition, bestPossiblePartitionStateMap.getPartitionMap(partition));
+ // Remove the partition's name from the set of partition (names) that need to be charged and assigned to prevent double-processing
+ switch (rebalanceType) {
+ case RECOVERY_BALANCE:
+ partitionsNeedRecovery.remove(partition);
+ break;
+ case LOAD_BALANCE:
+ partitionsNeedLoadbalance.remove(partition);
+ break;
+ }
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index bb7ed98..dd727ea 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -25,14 +25,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.testng.Assert;
import org.testng.annotations.Test;
+
public class TestIntermediateStateCalcStage extends BaseStageTest {
private ClusterConfig _clusterConfig;
@@ -43,17 +47,16 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
int nPartition = 2;
int nReplica = 3;
- Set<String> resourceSet = new HashSet<>();
+ String[] resources = new String[nResource];
for (int i = 0; i < nResource; i++) {
- resourceSet.add(resourcePrefix + "_" + i);
+ resources[i] = resourcePrefix + "_" + i;
}
- preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet, nReplica,
- nReplica);
- event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
- resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
- event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap(
- resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
+ preSetup(resources, nReplica, nReplica);
+ event.addAttribute(AttributeName.RESOURCES.name(),
+ getResourceMap(resources, nPartition, "OnlineOffline"));
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ getResourceMap(resources, nPartition, "OnlineOffline"));
// Initialize bestpossible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
@@ -64,7 +67,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
_clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
setClusterConfig(_clusterConfig);
- for (String resource : resourceSet) {
+ for (String resource : resources) {
IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
setSingleIdealState(is);
@@ -77,6 +80,11 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
if (resource.endsWith("0")) {
// Regular recovery balance
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
+ // add blocked state transition messages
+ Message pendingMessage = new Message("customType", "001");
+ pendingMessage.setToState("ONLINE");
+ currentStateOutput.setPendingMessage(resource, partition, instanceName, pendingMessage);
+
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
// should be recovered:
expectedResult.setState(resource, partition, instanceName, "ONLINE");
@@ -109,8 +117,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
}
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
- currentStateOutput.setCurrentState(resource, partition, instanceName + "-1",
- "OFFLINE");
+ currentStateOutput
+ .setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
// load balance is throttled, so keep all current states
expectedResult.setState(resource, partition, instanceName, "ONLINE");
// The following must be removed because now downward state transitions are allowed
@@ -128,8 +136,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
} else {
// Other partitions require dropping of replicas
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
- currentStateOutput.setCurrentState(resource, partition, instanceName + "-1",
- "OFFLINE");
+ currentStateOutput
+ .setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
// BestPossibleState dictates that we only need one ONLINE replica
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED");
@@ -152,7 +160,6 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
// Check that load balance (bringing up a new node) did not take place
bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE");
expectedResult.setState(resource, partition, instanceName, "ONLINE");
-
}
}
}
@@ -169,7 +176,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
- for (String resource : resourceSet) {
+ for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
// anything.
Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
@@ -184,24 +191,24 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
int nPartition = 2;
int nReplica = 3;
- Set<String> resourceSet = new HashSet<>();
+ String[] resources = new String[nResource];
for (int i = 0; i < nResource; i++) {
- resourceSet.add(resourcePrefix + "_" + i);
+ resources[i] = resourcePrefix + "_" + i;
}
- preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet,
- nReplica, nReplica);
- event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
- resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
- event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap(
- resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline"));
+ preSetup(resources, nReplica, nReplica);
+ event.addAttribute(AttributeName.RESOURCES.name(),
+ getResourceMap(resources, nPartition,
+ "OnlineOffline"));
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ getResourceMap(resources, nPartition, "OnlineOffline"));
// Initialize best possible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();
- for (String resource : resourceSet) {
+ for (String resource : resources) {
IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
setSingleIdealState(is);
@@ -248,26 +255,28 @@ public class TestIntermediateStateCalcStage extends BaseStageTest {
IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
- for (String resource : resourceSet) {
+ for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
// anything.
- Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
- .equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
+ Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(), expectedResult.getPartitionStateMap(resource).getStateMap());
}
}
- private void preSetup(StateTransitionThrottleConfig.RebalanceType rebalanceType,
- Set<String> resourceSet, int numOfLiveInstances, int numOfReplicas) {
- setupIdealState(numOfLiveInstances, resourceSet.toArray(new String[resourceSet.size()]),
- numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
+ private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) {
+ setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
+ IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
setupStateModel();
setupLiveInstances(numOfLiveInstances);
// Set up cluster configs
_clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
- StateTransitionThrottleConfig throttleConfig = new StateTransitionThrottleConfig(rebalanceType,
- StateTransitionThrottleConfig.ThrottleScope.CLUSTER, Integer.MAX_VALUE);
- _clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig));
+ _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList
+ .of(new StateTransitionThrottleConfig(
+ StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3),
+ new StateTransitionThrottleConfig(
+ StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3)));
setClusterConfig(_clusterConfig);
}
}