You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:01 UTC
[helix] 23/50: Implement one of the soft constraints (#450)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 79685962ec71dc8d5bd421d898b05bc85ef4c5cc
Author: Yi Wang <i3...@gmail.com>
AuthorDate: Tue Sep 10 15:37:15 2019 -0700
Implement one of the soft constraints (#450)
Implement Instance Partitions Count soft constraint.
Evaluate by instance's current partition count versus estimated max partition count.
Intuitively, Encourage the assignment if the instance's occupancy rate is below average;
Discourage the assignment if the instance's occupancy rate is above average.
The final normalized score will be within [0, 1].
The implementation of the class will depend on the cluster current total partitions count as the max score.
---
.../constraints/ConstraintBasedAlgorithm.java | 2 +-
.../InstancePartitionsCountConstraint.java | 47 +++++++++++++++++
.../constraints/LeastPartitionCountConstraint.java | 53 -------------------
.../waged/constraints/SoftConstraint.java | 40 ++++++++++-----
.../constraints/SoftConstraintWeightModel.java | 12 ++---
.../rebalancer/waged/model/ClusterContext.java | 11 ++--
.../constraints/TestConstraintBasedAlgorithm.java | 4 +-
.../TestInstancePartitionsCountConstraint.java | 60 ++++++++++++++++++++++
.../TestSoftConstraintNormalizeFunction.java | 47 +++++++++++++++++
9 files changed, 196 insertions(+), 80 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 99d8d2a..479fb78 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -117,7 +117,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
Function<AssignableNode, Float> calculatePoints =
(candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream()
.collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint
- .getAssignmentOriginScore(candidateNode, replica, clusterContext))));
+ .getAssignmentNormalizedScore(candidateNode, replica, clusterContext))));
return candidateNodes.stream().max(Comparator.comparing(calculatePoints));
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java
new file mode 100644
index 0000000..ca05cf8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java
@@ -0,0 +1,47 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+/**
+ * Evaluate by instance's current partition count versus estimated max partition count
+ * Intuitively, Encourage the assignment if the instance's occupancy rate is below average;
+ * Discourage the assignment if the instance's occupancy rate is above average
+ */
+class InstancePartitionsCountConstraint extends SoftConstraint {
+ private static final float MAX_SCORE = 1f;
+ private static final float MIN_SCORE = 0f;
+
+ InstancePartitionsCountConstraint() {
+ super(MAX_SCORE, MIN_SCORE);
+ }
+
+ @Override
+ protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
+ ClusterContext clusterContext) {
+ float doubleEstimatedMaxPartitionCount = 2 * clusterContext.getEstimatedMaxPartitionCount();
+ float currentPartitionCount = node.getAssignedReplicaCount();
+ return Math.max((doubleEstimatedMaxPartitionCount - currentPartitionCount)
+ / doubleEstimatedMaxPartitionCount, 0);
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java
deleted file mode 100644
index a8d36db..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.helix.controller.rebalancer.waged.constraints;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
-import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
-import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
-
-/**
- * Evaluate the proposed assignment according to the instance's partition count.
- */
-class LeastPartitionCountConstraint extends SoftConstraint {
- static LeastPartitionCountConstraint INSTANCE = new LeastPartitionCountConstraint();
-
- private LeastPartitionCountConstraint() {
- }
-
- /**
- * Returns a score depending on the number of assignments on this node. The score is scaled evenly
- * between the minScore and maxScore.
- * When the node is idle, return with the maxScore.
- * When the node usage reaches the estimated max partition, return with (minScore + maxScore ) /
- * 2.
- * When the node usage reaches 2 * estimated_max or more, return with the minScore.
- * If the estimated max partition count is not set, it defaults to Integer.MAX_VALUE in
- * clusterContext.
- */
- @Override
- float getAssignmentOriginScore(AssignableNode node, AssignableReplica replica,
- ClusterContext clusterContext) {
- throw new UnsupportedOperationException("The POC implementation has a bug, will fix it as TODO");
-// float doubleMaxPartitionCount = 2.0f * clusterContext.getEstimatedMaxPartitionCount();
-// int curPartitionCount = node.getCurrentAssignmentCount();
-// return Math.max((doubleMaxPartitionCount - curPartitionCount) / doubleMaxPartitionCount, 0);
- }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java
index db145fe..0f2bdbc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java
@@ -24,14 +24,15 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
/**
- * Evaluate a partition allocation proposal and return a score within the normalized range.
- * A higher score means the proposal is more preferred.
+ * The "soft" constraint evaluates the optimality of an assignment by giving it a score of a scale of [minScore, maxScore]
+ * The higher the score, the better the assignment; Intuitively, the assignment is encouraged.
+ * The lower score the score, the worse the assignment; Intuitively, the assignment is penalized.
*/
abstract class SoftConstraint {
private float _maxScore = 1000f;
private float _minScore = -1000f;
- interface ScalerFunction {
+ interface NormalizeFunction {
/**
* Scale the origin score to a normalized range (0, 1).
* The purpose is to compare scores between different soft constraints.
@@ -57,23 +58,38 @@ abstract class SoftConstraint {
_minScore = minScore;
}
+ float getMaxScore() {
+ return _maxScore;
+ }
+
+ float getMinScore() {
+ return _minScore;
+ }
+
/**
- * The scoring function returns a score between MINIMAL_SCORE and MAXIMUM_SCORE, which is then
- * weighted by the
- * individual normalized constraint weights.
- * Each individual constraint will define the meaning of MINIMAL_SCORE to MAXIMUM_SCORE
- * differently.
- * @return float value representing the score
+ * Evaluate and give a score for an potential assignment partition -> instance
+ * Child class only needs to care about how the score is implemented
+ * @return The score of the assignment in float value
*/
- abstract float getAssignmentOriginScore(AssignableNode node, AssignableReplica replica,
+ protected abstract float getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext);
/**
+ * Evaluate and give a score for an potential assignment partition -> instance
+ * It's the only exposed method to the caller
+ * @return The score is normalized to be within MinScore and MaxScore
+ */
+ float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
+ ClusterContext clusterContext) {
+ return getNormalizeFunction().scale(getAssignmentScore(node, replica, clusterContext));
+ }
+
+ /**
* The default scaler function that squashes any score within (min_score, max_score) to (0, 1);
* Child class could override the method and customize the method on its own
* @return The MinMaxScaler instance by default
*/
- ScalerFunction getScalerFunction() {
- return (score) -> (score - _minScore) / (_maxScore - _minScore);
+ NormalizeFunction getNormalizeFunction() {
+ return (score) -> (score - getMinScore()) / (getMaxScore() - getMinScore());
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java
index 41e4334..a961936 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java
@@ -27,7 +27,7 @@ import com.google.common.collect.ImmutableMap;
* The class retrieves the offline model that defines the relative importance of soft constraints.
*/
class SoftConstraintWeightModel {
- private static Map<? extends SoftConstraint, Float> MODEL;
+ private static Map<Class, Float> MODEL;
// TODO either define the weights in property files or zookeeper node or static human input
SoftConstraintWeightModel() {
@@ -35,8 +35,9 @@ class SoftConstraintWeightModel {
}
static {
- MODEL = ImmutableMap.<SoftConstraint, Float> builder()
- .put(LeastPartitionCountConstraint.INSTANCE, 1.0f).build();
+ //TODO update the weight
+ MODEL = ImmutableMap.<Class, Float> builder().put(InstancePartitionsCountConstraint.class, 1.0f)
+ .build();
}
/**
@@ -48,9 +49,8 @@ class SoftConstraintWeightModel {
float sum = 0;
for (Map.Entry<SoftConstraint, Float> softConstraintScoreEntry : originScoresMap.entrySet()) {
SoftConstraint softConstraint = softConstraintScoreEntry.getKey();
- float score = softConstraint.getScalerFunction().scale(softConstraintScoreEntry.getValue());
- float weight = MODEL.get(softConstraint);
- sum += score * weight;
+ float weight = MODEL.get(softConstraint.getClass());
+ sum += softConstraintScoreEntry.getValue() * weight;
}
return sum;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index c163e4c..a0c841a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -19,8 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.HelixException;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,6 +27,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.helix.HelixException;
+
/**
* This class tracks the rebalance-related global cluster status.
*/
@@ -47,8 +47,7 @@ public class ClusterContext {
/**
* Construct the cluster context based on the current instance status.
- *
- * @param replicaSet All the partition replicas that are managed by the rebalancer
+ * @param replicaSet All the partition replicas that are managed by the rebalancer
* @param instanceCount The count of all the active instances that can be used to host partitions.
*/
ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
@@ -95,8 +94,8 @@ public class ClusterContext {
void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
.computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
- throw new HelixException(String
- .format("Resource %s already has a replica from partition %s in fault zone %s",
+ throw new HelixException(
+ String.format("Resource %s already has a replica from partition %s in fault zone %s",
resourceName, partition, faultZoneId));
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
index d06cc5f..0e61eb3 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
@@ -44,7 +44,7 @@ public class TestConstraintBasedAlgorithm {
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
- when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f);
+ when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
@@ -62,7 +62,7 @@ public class TestConstraintBasedAlgorithm {
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true);
- when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f);
+ when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f);
_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java
new file mode 100644
index 0000000..7ffc40b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java
@@ -0,0 +1,60 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestInstancePartitionsCountConstraint {
+ private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+ private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+ private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+
+ private final SoftConstraint _constraint = new InstancePartitionsCountConstraint();
+
+ @Test
+ public void testWhenInstanceIsIdle() {
+ when(_testNode.getAssignedReplicaCount()).thenReturn(0);
+ float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+ Assert.assertEquals(score, 1.0f);
+ }
+
+ @Test
+ public void testWhenInstanceIsFull() {
+ when(_testNode.getAssignedReplicaCount()).thenReturn(10);
+ when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(10);
+ float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+ Assert.assertEquals(score, 0.5f);
+ }
+
+ @Test
+ public void testWhenInstanceHalfOccupied() {
+ when(_testNode.getAssignedReplicaCount()).thenReturn(10);
+ when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(20);
+ float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+ Assert.assertEquals(score, 0.75f);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java
new file mode 100644
index 0000000..b523959
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java
@@ -0,0 +1,47 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSoftConstraintNormalizeFunction {
+ @Test
+ public void testDefaultNormalizeFunction() {
+ int maxScore = 100;
+ int minScore = 0;
+ SoftConstraint softConstraint = new SoftConstraint(maxScore, minScore) {
+ @Override
+ protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
+ ClusterContext clusterContext) {
+ return 0;
+ }
+ };
+
+ for (int i = minScore; i <= maxScore; i++) {
+ float normalized = softConstraint.getNormalizeFunction().scale(i);
+ Assert.assertTrue(normalized <= 1 && normalized >= 0,
+ String.format("input: %s, output: %s", i, normalized));
+ }
+ }
+}