You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:26 UTC

[helix] 23/37: Implement one of the soft constraints (#450)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 06cb6928563da2213ad532e85b5f096e0a387c9f
Author: Yi Wang <i3...@gmail.com>
AuthorDate: Tue Sep 10 15:37:15 2019 -0700

    Implement one of the soft constraints (#450)
    
    Implement Instance Partitions Count soft constraint.
    Evaluate by instance's current partition count versus estimated max partition count.
    Intuitively, Encourage the assignment if the instance's occupancy rate is below average;
    Discourage the assignment if the instance's occupancy rate is above average.
    
    The final normalized score will be within [0, 1].
    The implementation of the class will depend on the cluster current total partitions count as the max score.
---
 .../constraints/ConstraintBasedAlgorithm.java      |  2 +-
 .../InstancePartitionsCountConstraint.java         | 47 +++++++++++++++++
 .../constraints/LeastPartitionCountConstraint.java | 53 -------------------
 .../waged/constraints/SoftConstraint.java          | 40 ++++++++++-----
 .../constraints/SoftConstraintWeightModel.java     | 12 ++---
 .../rebalancer/waged/model/ClusterContext.java     | 11 ++--
 .../constraints/TestConstraintBasedAlgorithm.java  |  4 +-
 .../TestInstancePartitionsCountConstraint.java     | 60 ++++++++++++++++++++++
 .../TestSoftConstraintNormalizeFunction.java       | 47 +++++++++++++++++
 9 files changed, 196 insertions(+), 80 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 99d8d2a..479fb78 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -117,7 +117,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
     Function<AssignableNode, Float> calculatePoints =
         (candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream()
             .collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint
-                .getAssignmentOriginScore(candidateNode, replica, clusterContext))));
+                .getAssignmentNormalizedScore(candidateNode, replica, clusterContext))));
 
     return candidateNodes.stream().max(Comparator.comparing(calculatePoints));
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java
new file mode 100644
index 0000000..ca05cf8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/InstancePartitionsCountConstraint.java
@@ -0,0 +1,47 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+
+/**
+ * Evaluate by instance's current partition count versus estimated max partition count
+ * Intuitively, Encourage the assignment if the instance's occupancy rate is below average;
+ * Discourage the assignment if the instance's occupancy rate is above average
+ */
+class InstancePartitionsCountConstraint extends SoftConstraint {
+  private static final float MAX_SCORE = 1f;
+  private static final float MIN_SCORE = 0f;
+
+  InstancePartitionsCountConstraint() {
+    super(MAX_SCORE, MIN_SCORE);
+  }
+
+  @Override
+  protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    float doubleEstimatedMaxPartitionCount = 2 * clusterContext.getEstimatedMaxPartitionCount();
+    float currentPartitionCount = node.getAssignedReplicaCount();
+    return Math.max((doubleEstimatedMaxPartitionCount - currentPartitionCount)
+        / doubleEstimatedMaxPartitionCount, 0);
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java
deleted file mode 100644
index a8d36db..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/LeastPartitionCountConstraint.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package org.apache.helix.controller.rebalancer.waged.constraints;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
-import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
-import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
-
-/**
- * Evaluate the proposed assignment according to the instance's partition count.
- */
-class LeastPartitionCountConstraint extends SoftConstraint {
-  static LeastPartitionCountConstraint INSTANCE = new LeastPartitionCountConstraint();
-
-  private LeastPartitionCountConstraint() {
-  }
-
-  /**
-   * Returns a score depending on the number of assignments on this node. The score is scaled evenly
-   * between the minScore and maxScore.
-   * When the node is idle, return with the maxScore.
-   * When the node usage reaches the estimated max partition, return with (minScore + maxScore ) /
-   * 2.
-   * When the node usage reaches 2 * estimated_max or more, return with the minScore.
-   * If the estimated max partition count is not set, it defaults to Integer.MAX_VALUE in
-   * clusterContext.
-   */
-  @Override
-  float getAssignmentOriginScore(AssignableNode node, AssignableReplica replica,
-      ClusterContext clusterContext) {
-      throw new UnsupportedOperationException("The POC implementation has a bug, will fix it as TODO");
-//    float doubleMaxPartitionCount = 2.0f * clusterContext.getEstimatedMaxPartitionCount();
-//    int curPartitionCount = node.getCurrentAssignmentCount();
-//    return Math.max((doubleMaxPartitionCount - curPartitionCount) / doubleMaxPartitionCount, 0);
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java
index db145fe..0f2bdbc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraint.java
@@ -24,14 +24,15 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
 
 /**
- * Evaluate a partition allocation proposal and return a score within the normalized range.
- * A higher score means the proposal is more preferred.
+ * The "soft" constraint evaluates the optimality of an assignment by giving it a score of a scale of [minScore, maxScore]
+ * The higher the score, the better the assignment; Intuitively, the assignment is encouraged.
+ * The lower score the score, the worse the assignment; Intuitively, the assignment is penalized.
  */
 abstract class SoftConstraint {
   private float _maxScore = 1000f;
   private float _minScore = -1000f;
 
-  interface ScalerFunction {
+  interface NormalizeFunction {
     /**
      * Scale the origin score to a normalized range (0, 1).
      * The purpose is to compare scores between different soft constraints.
@@ -57,23 +58,38 @@ abstract class SoftConstraint {
     _minScore = minScore;
   }
 
+  float getMaxScore() {
+    return _maxScore;
+  }
+
+  float getMinScore() {
+    return _minScore;
+  }
+
   /**
-   * The scoring function returns a score between MINIMAL_SCORE and MAXIMUM_SCORE, which is then
-   * weighted by the
-   * individual normalized constraint weights.
-   * Each individual constraint will define the meaning of MINIMAL_SCORE to MAXIMUM_SCORE
-   * differently.
-   * @return float value representing the score
+   * Evaluate and give a score for an potential assignment partition -> instance
+   * Child class only needs to care about how the score is implemented
+   * @return The score of the assignment in float value
    */
-  abstract float getAssignmentOriginScore(AssignableNode node, AssignableReplica replica,
+  protected abstract float getAssignmentScore(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext);
 
   /**
+   * Evaluate and give a score for an potential assignment partition -> instance
+   * It's the only exposed method to the caller
+   * @return The score is normalized to be within MinScore and MaxScore
+   */
+  float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
+      ClusterContext clusterContext) {
+    return getNormalizeFunction().scale(getAssignmentScore(node, replica, clusterContext));
+  }
+
+  /**
    * The default scaler function that squashes any score within (min_score, max_score) to (0, 1);
    * Child class could override the method and customize the method on its own
    * @return The MinMaxScaler instance by default
    */
-  ScalerFunction getScalerFunction() {
-    return (score) -> (score - _minScore) / (_maxScore - _minScore);
+  NormalizeFunction getNormalizeFunction() {
+    return (score) -> (score - getMinScore()) / (getMaxScore() - getMinScore());
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java
index 41e4334..a961936 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java
@@ -27,7 +27,7 @@ import com.google.common.collect.ImmutableMap;
  * The class retrieves the offline model that defines the relative importance of soft constraints.
  */
 class SoftConstraintWeightModel {
-  private static Map<? extends SoftConstraint, Float> MODEL;
+  private static Map<Class, Float> MODEL;
 
   // TODO either define the weights in property files or zookeeper node or static human input
   SoftConstraintWeightModel() {
@@ -35,8 +35,9 @@ class SoftConstraintWeightModel {
   }
 
   static {
-    MODEL = ImmutableMap.<SoftConstraint, Float> builder()
-        .put(LeastPartitionCountConstraint.INSTANCE, 1.0f).build();
+    //TODO update the weight
+    MODEL = ImmutableMap.<Class, Float> builder().put(InstancePartitionsCountConstraint.class, 1.0f)
+        .build();
   }
 
   /**
@@ -48,9 +49,8 @@ class SoftConstraintWeightModel {
     float sum = 0;
     for (Map.Entry<SoftConstraint, Float> softConstraintScoreEntry : originScoresMap.entrySet()) {
       SoftConstraint softConstraint = softConstraintScoreEntry.getKey();
-      float score = softConstraint.getScalerFunction().scale(softConstraintScoreEntry.getValue());
-      float weight = MODEL.get(softConstraint);
-      sum += score * weight;
+      float weight = MODEL.get(softConstraint.getClass());
+      sum += softConstraintScoreEntry.getValue() * weight;
     }
 
     return sum;
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index c163e4c..a0c841a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -19,8 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.HelixException;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.HelixException;
+
 /**
  * This class tracks the rebalance-related global cluster status.
  */
@@ -47,8 +47,7 @@ public class ClusterContext {
 
   /**
    * Construct the cluster context based on the current instance status.
-   *
-   * @param replicaSet    All the partition replicas that are managed by the rebalancer
+   * @param replicaSet All the partition replicas that are managed by the rebalancer
    * @param instanceCount The count of all the active instances that can be used to host partitions.
    */
   ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
@@ -95,8 +94,8 @@ public class ClusterContext {
   void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
     if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
         .computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
-      throw new HelixException(String
-          .format("Resource %s already has a replica from partition %s in fault zone %s",
+      throw new HelixException(
+          String.format("Resource %s already has a replica from partition %s in fault zone %s",
               resourceName, partition, faultZoneId));
     }
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
index d06cc5f..0e61eb3 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
@@ -44,7 +44,7 @@ public class TestConstraintBasedAlgorithm {
     SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
     SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
     when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
-    when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f);
+    when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
 
     _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
         ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
@@ -62,7 +62,7 @@ public class TestConstraintBasedAlgorithm {
     SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
     SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
     when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true);
-    when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f);
+    when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
     when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f);
     _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
         ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java
new file mode 100644
index 0000000..7ffc40b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestInstancePartitionsCountConstraint.java
@@ -0,0 +1,60 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.mockito.Mockito.when;
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestInstancePartitionsCountConstraint {
+  private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
+  private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
+  private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);
+
+  private final SoftConstraint _constraint = new InstancePartitionsCountConstraint();
+
+  @Test
+  public void testWhenInstanceIsIdle() {
+    when(_testNode.getAssignedReplicaCount()).thenReturn(0);
+    float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+    Assert.assertEquals(score, 1.0f);
+  }
+
+  @Test
+  public void testWhenInstanceIsFull() {
+    when(_testNode.getAssignedReplicaCount()).thenReturn(10);
+    when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(10);
+    float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+    Assert.assertEquals(score, 0.5f);
+  }
+
+  @Test
+  public void testWhenInstanceHalfOccupied() {
+    when(_testNode.getAssignedReplicaCount()).thenReturn(10);
+    when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(20);
+    float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
+    Assert.assertEquals(score, 0.75f);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java
new file mode 100644
index 0000000..b523959
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestSoftConstraintNormalizeFunction.java
@@ -0,0 +1,47 @@
+package org.apache.helix.controller.rebalancer.waged.constraints;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSoftConstraintNormalizeFunction {
+  @Test
+  public void testDefaultNormalizeFunction() {
+    int maxScore = 100;
+    int minScore = 0;
+    SoftConstraint softConstraint = new SoftConstraint(maxScore, minScore) {
+      @Override
+      protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
+          ClusterContext clusterContext) {
+        return 0;
+      }
+    };
+
+    for (int i = minScore; i <= maxScore; i++) {
+      float normalized = softConstraint.getNormalizeFunction().scale(i);
+      Assert.assertTrue(normalized <= 1 && normalized >= 0,
+          String.format("input: %s, output: %s", i, normalized));
+    }
+  }
+}