You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/04/23 21:18:26 UTC
[helix] 02/03: New PartitionMovementConstraint and
BaselineInfluenceConstraint for Waged (#1658)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 89877b8f7fe2ce8ca6471a425edd77500c9458db
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Mar 8 16:44:31 2021 -0800
New PartitionMovementConstraint and BaselineInfluenceConstraint for Waged (#1658)
This PR splits PartitionMovementConstraint into separate constraints that control baseline convergence and best possible movements respectively.
Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
.../AbstractPartitionMovementConstraint.java | 86 ++++++++++++++++
.../constraints/BaselineInfluenceConstraint.java | 50 ++++++++++
.../ConstraintBasedAlgorithmFactory.java | 39 ++++++--
.../constraints/PartitionMovementConstraint.java | 83 ++-------------
.../java/org/apache/helix/model/ClusterConfig.java | 31 ++++--
.../TestPartitionMovementConstraint.java | 111 +++++++--------------
.../org/apache/helix/model/TestClusterConfig.java | 23 ++++-
7 files changed, 253 insertions(+), 170 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
new file mode 100644
index 0000000..913e042
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/AbstractPartitionMovementConstraint.java
@@ -0,0 +1,86 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+
+/**
+ * Evaluate the proposed assignment according to the potential partition movements cost.
+ * The cost is evaluated based on the difference between the old assignment and the new assignment.
+ * Any change from the old assignment will increase the partition movements cost, so that the
+ * evaluated score will become lower.
+ */
+abstract class AbstractPartitionMovementConstraint extends SoftConstraint {
+ protected static final double MAX_SCORE = 1f;
+ protected static final double MIN_SCORE = 0f;
+
+ private static final double STATE_TRANSITION_COST_FACTOR = 0.5;
+
+ AbstractPartitionMovementConstraint() {
+ super(MAX_SCORE, MIN_SCORE);
+ }
+
+ /**
+ * @return MAX_SCORE if the proposed assignment completely matches the previous assignment.
+ * StateTransitionCostFactor if the proposed assignment's allocation matches the
+ * previous assignment but state does not match.
+ * MIN_SCORE if the proposed assignment completely doesn't match the previous one.
+ */
+ @Override
+ protected abstract double getAssignmentScore(AssignableNode node, AssignableReplica replica,
+ ClusterContext clusterContext);
+
+ protected Map<String, String> getStateMap(AssignableReplica replica,
+ Map<String, ResourceAssignment> assignment) {
+ String resourceName = replica.getResourceName();
+ String partitionName = replica.getPartitionName();
+ if (assignment == null || !assignment.containsKey(resourceName)) {
+ return Collections.emptyMap();
+ }
+ return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
+ }
+
+ protected double calculateAssignmentScore(String nodeName, String state,
+ Map<String, String> instanceToStateMap) {
+ if (instanceToStateMap.containsKey(nodeName)) {
+ // The score when the proposed allocation partially matches the assignment plan but will
+ // require a state transition.
+ double scoreWithStateTransitionCost =
+ MIN_SCORE + (MAX_SCORE - MIN_SCORE) * STATE_TRANSITION_COST_FACTOR;
+ // if state matches, no state transition required for the proposed assignment; if state does
+ // not match, then the proposed assignment requires state transition.
+ return state.equals(instanceToStateMap.get(nodeName)) ? MAX_SCORE
+ : scoreWithStateTransitionCost;
+ }
+ return MIN_SCORE;
+ }
+
+ @Override
+ protected NormalizeFunction getNormalizeFunction() {
+ return (score) -> score;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
new file mode 100644
index 0000000..5e3fcd2
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/BaselineInfluenceConstraint.java
@@ -0,0 +1,50 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+
+/**
+ * Evaluate the proposed assignment according to the potential partition movements cost based on
+ * the baseline assignment's influence.
+ * This constraint promotes movements for evenness. If best possible doesn't exist, baseline will be
+ * used to restrict movements, so this constraint should give no score in that case.
+ */
+public class BaselineInfluenceConstraint extends AbstractPartitionMovementConstraint {
+ @Override
+ protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
+ ClusterContext clusterContext) {
+ Map<String, String> bestPossibleAssignment =
+ getStateMap(replica, clusterContext.getBestPossibleAssignment());
+ if (bestPossibleAssignment.isEmpty()) {
+ return getMinScore();
+ }
+
+ Map<String, String> baselineAssignment =
+ getStateMap(replica, clusterContext.getBaselineAssignment());
+ return calculateAssignmentScore(node.getInstanceName(), replica.getReplicaState(),
+ baselineAssignment);
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 33aa6c8..032c7b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -39,12 +39,17 @@ public class ConstraintBasedAlgorithmFactory {
{
// The default setting
put(PartitionMovementConstraint.class.getSimpleName(), 2f);
+ put(BaselineInfluenceConstraint.class.getSimpleName(), 0.5f);
put(InstancePartitionsCountConstraint.class.getSimpleName(), 1f);
put(ResourcePartitionAntiAffinityConstraint.class.getSimpleName(), 1f);
put(TopStateMaxCapacityUsageInstanceConstraint.class.getSimpleName(), 3f);
put(MaxCapacityUsageInstanceConstraint.class.getSimpleName(), 6f);
}
};
+ // The weight for BaselineInfluenceConstraint used when we are forcing a baseline converge. This
+ // number, multiplied by the max score returned by BaselineInfluenceConstraint, must be greater
+ // than the total maximum sum of all other constraints, in order to overpower other constraints.
+ private static final float FORCE_BASELINE_CONVERGE_WEIGHT = 10000f;
static {
Properties properties =
@@ -61,23 +66,37 @@ public class ConstraintBasedAlgorithmFactory {
new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(),
new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint());
- int evennessPreference =
- preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
- int movementPreference =
- preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
+ int evennessPreference = preferences
+ .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS,
+ ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE
+ .get(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS));
+ int movementPreference = preferences
+ .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT,
+ ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE
+ .get(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT));
+ boolean forceBaselineConverge = preferences
+ .getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, 0)
+ > 0;
List<SoftConstraint> softConstraints = ImmutableList
- .of(new PartitionMovementConstraint(), new InstancePartitionsCountConstraint(),
- new ResourcePartitionAntiAffinityConstraint(),
+ .of(new PartitionMovementConstraint(), new BaselineInfluenceConstraint(),
+ new InstancePartitionsCountConstraint(), new ResourcePartitionAntiAffinityConstraint(),
new TopStateMaxCapacityUsageInstanceConstraint(),
new MaxCapacityUsageInstanceConstraint());
Map<SoftConstraint, Float> softConstraintsWithWeight = Maps.toMap(softConstraints, key -> {
- String name = key.getClass().getSimpleName();
- float weight = MODEL.get(name);
- return name.equals(PartitionMovementConstraint.class.getSimpleName()) ?
- movementPreference * weight : evennessPreference * weight;
+ if (key instanceof BaselineInfluenceConstraint && forceBaselineConverge) {
+ return FORCE_BASELINE_CONVERGE_WEIGHT;
+ }
+
+ float weight = MODEL.get(key.getClass().getSimpleName());
+ // Note that BaselineInfluenceConstraint is a constraint that promotes movement for evenness,
+ // and is therefore controlled by the evenness preference. Only PartitionMovementConstraint
+ // contributes to less movement.
+ return key instanceof PartitionMovementConstraint ? movementPreference * weight
+ : evennessPreference * weight;
});
+
return new ConstraintBasedAlgorithm(hardConstraints, softConstraintsWithWeight);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
index 351e33d..08c135d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/PartitionMovementConstraint.java
@@ -19,47 +19,20 @@ package org.apache.helix.controller.rebalancer.waged.constraints;
* under the License.
*/
-import java.util.Collections;
import java.util.Map;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceAssignment;
+
/**
- * Evaluate the proposed assignment according to the potential partition movements cost.
- * The cost is evaluated based on the difference between the old assignment and the new assignment.
- * In detail, we consider the following two previous assignments as the base.
- * - Baseline assignment that is calculated regardless of the node state (online/offline).
- * - Previous Best Possible assignment.
- * Any change to these two assignments will increase the partition movements cost, so that the
- * evaluated score will become lower.
+ * Evaluate the proposed assignment according to the potential partition movements cost based on
+ * the previous best possible assignment.
+ * The previous best possible assignment is the sole reference; if it's missing, it means the
+ * replica belongs to a newly added resource, so baseline assignment should be used instead.
*/
-class PartitionMovementConstraint extends SoftConstraint {
- private static final double MAX_SCORE = 1f;
- private static final double MIN_SCORE = 0f;
- // The scale factor to adjust score when the proposed allocation partially matches the assignment
- // plan but will require a state transition (with partition movement).
- // TODO: these factors will be tuned based on user's preference
- private static final double STATE_TRANSITION_COST_FACTOR = 0.5;
- private static final double MOVEMENT_COST_FACTOR = 0.25;
-
- PartitionMovementConstraint() {
- super(MAX_SCORE, MIN_SCORE);
- }
-
- /**
- * @return 1 if the proposed assignment completely matches the previous best possible assignment
- * (or baseline assignment if the replica is newly added).
- * STATE_TRANSITION_COST_FACTOR if the proposed assignment's allocation matches the
- * previous Best Possible assignment (or baseline assignment if the replica is newly
- * added) but state does not match.
- * MOVEMENT_COST_FACTOR if the proposed assignment's allocation matches the baseline
- * assignment only, but not matches the previous best possible assignment.
- * 0 if the proposed assignment is a pure random movement.
- */
+public class PartitionMovementConstraint extends AbstractPartitionMovementConstraint {
@Override
protected double getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
@@ -71,48 +44,10 @@ class PartitionMovementConstraint extends SoftConstraint {
String state = replica.getReplicaState();
if (bestPossibleAssignment.isEmpty()) {
- // If bestPossibleAssignment of the replica is empty, indicating this is a new replica.
- // Then the baseline is the only reference.
+ // if best possible is missing, it means the replica belongs to a newly added resource, so
+ // baseline assignment should be used instead.
return calculateAssignmentScore(nodeName, state, baselineAssignment);
- } else {
- // Else, for minimizing partition movements or state transitions, prioritize the proposed
- // assignment that matches the previous Best Possible assignment.
- double score = calculateAssignmentScore(nodeName, state, bestPossibleAssignment);
- // If no Best Possible assignment matches, check the baseline assignment.
- if (score == 0 && baselineAssignment.containsKey(nodeName)) {
- // Although not desired, the proposed assignment that matches the baseline is still better
- // than a random movement. So try to evaluate the score with the MOVEMENT_COST_FACTOR
- // punishment.
- score = MOVEMENT_COST_FACTOR;
- }
- return score;
- }
- }
-
- private Map<String, String> getStateMap(AssignableReplica replica,
- Map<String, ResourceAssignment> assignment) {
- String resourceName = replica.getResourceName();
- String partitionName = replica.getPartitionName();
- if (assignment == null || !assignment.containsKey(resourceName)) {
- return Collections.emptyMap();
}
- return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
- }
-
- private double calculateAssignmentScore(String nodeName, String state,
- Map<String, String> instanceToStateMap) {
- if (instanceToStateMap.containsKey(nodeName)) {
- return state.equals(instanceToStateMap.get(nodeName)) ?
- 1 : // if state matches, no state transition required for the proposed assignment
- STATE_TRANSITION_COST_FACTOR; // if state does not match,
- // then the proposed assignment requires state transition.
- }
- return 0;
- }
-
- @Override
- protected NormalizeFunction getNormalizeFunction() {
- // PartitionMovementConstraint already scale the score properly.
- return (score) -> score;
+ return calculateAssignmentScore(nodeName, state, bestPossibleAssignment);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 492ac7f..ccb1684 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -138,8 +138,10 @@ public class ClusterConfig extends HelixProperty {
}
public enum GlobalRebalancePreferenceKey {
+ // EVENNESS and LESS_MOVEMENT must be both specified
EVENNESS,
- LESS_MOVEMENT
+ LESS_MOVEMENT,
+ FORCE_BASELINE_CONVERGE,
}
private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
@@ -160,7 +162,8 @@ public class ClusterConfig extends HelixProperty {
DEFAULT_GLOBAL_REBALANCE_PREFERENCE =
ImmutableMap.<GlobalRebalancePreferenceKey, Integer>builder()
.put(GlobalRebalancePreferenceKey.EVENNESS, 1)
- .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
+ .put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1)
+ .put(GlobalRebalancePreferenceKey.FORCE_BASELINE_CONVERGE, 0).build();
private final static int MAX_REBALANCE_PREFERENCE = 10;
private final static int MIN_REBALANCE_PREFERENCE = 0;
public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
@@ -862,14 +865,22 @@ public class ClusterConfig extends HelixProperty {
/**
* Set the global rebalancer's assignment preference.
- * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight.
- * The ratio of the configured weights will determine the rebalancer's behavior.
+ * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weights.
+ * The weights will determine the rebalancer's behavior. Note that
+ * GlobalRebalancePreferenceKey.EVENNESS and
+ * GlobalRebalancePreferenceKey.LESS_MOVEMENT must be both specified or not
+ * specified, or an exception will be thrown.
* If null, the preference item will be removed from the config.
*/
public void setGlobalRebalancePreference(Map<GlobalRebalancePreferenceKey, Integer> preference) {
if (preference == null) {
_record.getMapFields().remove(ClusterConfigProperty.REBALANCE_PREFERENCE.name());
} else {
+ if (preference.containsKey(GlobalRebalancePreferenceKey.EVENNESS) != preference
+ .containsKey(GlobalRebalancePreferenceKey.LESS_MOVEMENT)) {
+ throw new IllegalArgumentException("GlobalRebalancePreferenceKey.EVENNESS and "
+ + "GlobalRebalancePreferenceKey.LESS_MOVEMENT must be both specified or not specified");
+ }
Map<String, String> preferenceMap = new HashMap<>();
preference.entrySet().stream().forEach(entry -> {
if (entry.getValue() > MAX_REBALANCE_PREFERENCE
@@ -893,11 +904,15 @@ public class ClusterConfig extends HelixProperty {
if (preferenceStrMap != null && !preferenceStrMap.isEmpty()) {
Map<GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
for (GlobalRebalancePreferenceKey key : GlobalRebalancePreferenceKey.values()) {
- if (!preferenceStrMap.containsKey(key.name())) {
- // If any key is not configured with a value, return the default config.
- return DEFAULT_GLOBAL_REBALANCE_PREFERENCE;
+ if (preferenceStrMap.containsKey(key.name())) {
+ preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name())));
}
- preference.put(key, Integer.parseInt(preferenceStrMap.get(key.name())));
+ }
+ // In case this map is set incorrectly, check for both attributes to ensure strong pairing
+ if (preference.containsKey(GlobalRebalancePreferenceKey.EVENNESS) != preference
+ .containsKey(GlobalRebalancePreferenceKey.LESS_MOVEMENT)) {
+ preference.remove(GlobalRebalancePreferenceKey.EVENNESS);
+ preference.remove(GlobalRebalancePreferenceKey.LESS_MOVEMENT);
}
return preference;
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
index d36f629..16c1994 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestPartitionMovementConstraint.java
@@ -42,7 +42,8 @@ public class TestPartitionMovementConstraint {
private AssignableNode _testNode;
private AssignableReplica _testReplica;
private ClusterContext _clusterContext;
- private SoftConstraint _constraint = new PartitionMovementConstraint();
+ private SoftConstraint _baselineInfluenceConstraint = new BaselineInfluenceConstraint();
+ private SoftConstraint _partitionMovementConstraint = new PartitionMovementConstraint();
@BeforeMethod
public void init() {
@@ -58,42 +59,33 @@ public class TestPartitionMovementConstraint {
public void testGetAssignmentScoreWhenBestPossibleBaselineMissing() {
when(_clusterContext.getBaselineAssignment()).thenReturn(Collections.emptyMap());
when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
- double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
- double normalizedScore =
- _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
- Assert.assertEquals(score, 0.0);
- Assert.assertEquals(normalizedScore, 0.0);
+
+ verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
+ verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
}
@Test
- public void testGetAssignmentScoreWhenBestPossibleBaselineSame() {
+ public void testGetAssignmentScoreWhenBestPossibleMissing() {
ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class);
when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION)))
.thenReturn(ImmutableMap.of(INSTANCE, "Master"));
Map<String, ResourceAssignment> assignmentMap =
ImmutableMap.of(RESOURCE, mockResourceAssignment);
when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap);
- when(_clusterContext.getBestPossibleAssignment()).thenReturn(assignmentMap);
+ when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
// when the calculated states are both equal to the replica's current state
when(_testReplica.getReplicaState()).thenReturn("Master");
- double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
- double normalizedScore =
- _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+ verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
+ verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 1.0, 1.0);
- Assert.assertEquals(score, 1.0);
- Assert.assertEquals(normalizedScore, 1.0);
// when the calculated states are both different from the replica's current state
when(_testReplica.getReplicaState()).thenReturn("Slave");
- score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
- normalizedScore =
- _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
-
- Assert.assertEquals(score, 0.5);
- Assert.assertEquals(normalizedScore, 0.5);
+ verifyScore(_baselineInfluenceConstraint, _testNode, _testReplica, _clusterContext, 0.0, 0.0);
+ verifyScore(_partitionMovementConstraint, _testNode, _testReplica, _clusterContext, 0.5, 0.5);
}
@Test
- public void testGetAssignmentScoreWhenBestPossibleBaselineOpposite() {
+ public void testGetAssignmentScore() {
String instanceNameA = INSTANCE + "A";
String instanceNameB = INSTANCE + "B";
String instanceNameC = INSTANCE + "C";
@@ -110,72 +102,45 @@ public class TestPartitionMovementConstraint {
when(_clusterContext.getBaselineAssignment())
.thenReturn(ImmutableMap.of(RESOURCE, baselineResourceAssignment));
- // when the replica's state matches with best possible
+ // when the replica's state matches with best possible, allocation matches with baseline
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameA);
when(_testReplica.getReplicaState()).thenReturn("Master");
- double score =
- _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
- double normalizedScore =
- _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
- Assert.assertEquals(score, 1.0);
- Assert.assertEquals(normalizedScore, 1.0);
-
- // when the replica's allocation matches with best possible
+ verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 0.5, 0.5);
+ verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 1.0, 1.0);
+
+ // when the replica's allocation matches with best possible only
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameB);
when(_testReplica.getReplicaState()).thenReturn("Master");
- score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
- normalizedScore =
- _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
- Assert.assertEquals(score, 0.5);
- Assert.assertEquals(normalizedScore, 0.5);
+ verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 0.0, 0.0);
+ verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 0.5, 0.5);
// when the replica's state matches with baseline only
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
when(_testReplica.getReplicaState()).thenReturn("Master");
- score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
- normalizedScore =
- _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
- // The calculated score is lower than previous value cause the replica's state matches with
- // best possible is preferred
- Assert.assertEquals(score, 0.25);
- Assert.assertEquals(normalizedScore, 0.25);
+ verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 1.0, 1.0);
+ verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 0.0, 0.0);
// when the replica's allocation matches with baseline only
when(testAssignableNode.getInstanceName()).thenReturn(instanceNameC);
when(_testReplica.getReplicaState()).thenReturn("Slave");
- score = _constraint.getAssignmentScore(testAssignableNode, _testReplica, _clusterContext);
- normalizedScore =
- _constraint.getAssignmentNormalizedScore(testAssignableNode, _testReplica, _clusterContext);
- // The calculated score is lower than previous value cause the replica's state matches with
- // best possible is preferred
- Assert.assertEquals(score, 0.25);
- Assert.assertEquals(normalizedScore, 0.25);
+ verifyScore(_baselineInfluenceConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 0.5, 0.5);
+ verifyScore(_partitionMovementConstraint, testAssignableNode, _testReplica, _clusterContext,
+ 0.0, 0.0);
}
- @Test
- public void testGetAssignmentScoreWhenBestPossibleMissing() {
- ResourceAssignment mockResourceAssignment = mock(ResourceAssignment.class);
- when(mockResourceAssignment.getReplicaMap(new Partition(PARTITION)))
- .thenReturn(ImmutableMap.of(INSTANCE, "Master"));
- Map<String, ResourceAssignment> assignmentMap =
- ImmutableMap.of(RESOURCE, mockResourceAssignment);
- when(_clusterContext.getBaselineAssignment()).thenReturn(assignmentMap);
- when(_clusterContext.getBestPossibleAssignment()).thenReturn(Collections.emptyMap());
- // when the calculated states are both equal to the replica's current state
- when(_testReplica.getReplicaState()).thenReturn("Master");
- double score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
- double normalizedScore =
- _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
-
- Assert.assertEquals(score, 1.0);
- Assert.assertEquals(normalizedScore, 1.0);
- // when the calculated states are both different from the replica's current state
- when(_testReplica.getReplicaState()).thenReturn("Slave");
- score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext);
- normalizedScore =
- _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
-
- Assert.assertEquals(score, 0.5);
- Assert.assertEquals(normalizedScore, 0.5);
+ private static void verifyScore(SoftConstraint constraint, AssignableNode node,
+ AssignableReplica replica, ClusterContext clusterContext, double expectedScore,
+ double expectedNormalizedScore) {
+ double score = constraint.getAssignmentScore(node, replica, clusterContext);
+ double normalizedScore = constraint.getAssignmentNormalizedScore(node, replica, clusterContext);
+ Assert.assertEquals(score, expectedScore);
+ Assert.assertEquals(normalizedScore, expectedNormalizedScore);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8d6b0a2..3690ca4 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -122,13 +122,17 @@ public class TestClusterConfig {
ClusterConfig testConfig = new ClusterConfig("testId");
Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+ }
- Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
- preference.put(EVENNESS, 5);
- testConfig.setGlobalRebalancePreference(preference);
+ @Test
+ public void testGetRebalancePreferenceMissingKey() {
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ Map<String, String> preference = new HashMap<>();
+ preference.put(EVENNESS.name(), String.valueOf(5));
+ testConfig.getRecord()
+ .setMapField(ClusterConfig.ClusterConfigProperty.REBALANCE_PREFERENCE.name(), preference);
- Assert.assertEquals(testConfig.getGlobalRebalancePreference(),
- ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
+ Assert.assertEquals(testConfig.getGlobalRebalancePreference(), Collections.emptyMap());
}
@Test
@@ -171,6 +175,15 @@ public class TestClusterConfig {
testConfig.setGlobalRebalancePreference(preference);
}
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testSetRebalancePreferenceMissingKey() {
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = new HashMap<>();
+ preference.put(EVENNESS, 1);
+
+ ClusterConfig testConfig = new ClusterConfig("testId");
+ testConfig.setGlobalRebalancePreference(preference);
+ }
+
@Test
public void testGetInstanceCapacityMap() {
Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3);