You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/02/16 21:56:07 UTC

[helix] branch master updated: Use final remaining capacity when computing weighted score (#1961)

This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new db99f56  Use final remaining capacity when computing weighted score (#1961)
db99f56 is described below

commit db99f56083d246b208b8e92029593daf5d82a183
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Feb 16 13:52:48 2022 -0800

    Use final remaining capacity when computing weighted score (#1961)
    
    WAGED improvement: Use final remaining capacity when computing weighted score
---
 .../constraints/ConstraintBasedAlgorithm.java      | 59 ++++++++++------------
 .../rebalancer/waged/model/ClusterContext.java     | 32 +++++++++++-
 .../constraints/TestConstraintBasedAlgorithm.java  | 22 +++++++-
 .../waged/model/AbstractTestClusterModel.java      | 44 ++++++++++++++--
 .../waged/model/ClusterModelTestHelper.java        | 14 ++++-
 5 files changed, 131 insertions(+), 40 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index bb5df33..9065869 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
  * "hard constraints"
  */
 class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
+  private static final float DIV_GUARD = 0.01f;
   private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
   private final List<HardConstraint> _hardConstraints;
   private final Map<SoftConstraint, Float> _softConstraints;
@@ -70,15 +71,29 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
     Set<String> busyInstances =
         getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
 
-    // Compute overall utilization of the cluster. Capacity dimension -> total remaining capacity
-    Map<String, Integer> overallClusterRemainingCapacityMap =
-        computeOverallClusterRemainingCapacity(nodes);
+    // create a always >0 capacity map to avoid divide by 0.
+    Map<String, Float> positiveEstimateClusterRemainCap = new HashMap<>();
+    for (Map.Entry<String, Integer> clusterRemainingCap : clusterModel.getContext()
+        .getEstimateUtilizationMap().entrySet()) {
+      String capacityKey = clusterRemainingCap.getKey();
+      if (clusterRemainingCap.getValue() < 0) {
+        // all replicas' assignment will fail if there is one dimension's remain capacity <0.
+        throw new HelixRebalanceException(String
+            .format("The cluster does not have enough %s capacity for all partitions. ",
+                capacityKey), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+      }
+      // estimate remain capacity after assignment + %1 of current cluster capacity before assignment
+      positiveEstimateClusterRemainCap.put(capacityKey,
+           clusterModel.getContext().getEstimateUtilizationMap().get(capacityKey) +
+          (clusterModel.getContext().getClusterCapacityMap().get(capacityKey) * DIV_GUARD));
+    }
 
     // Create a wrapper for each AssignableReplica.
     List<AssignableReplicaWithScore> toBeAssignedReplicas =
         clusterModel.getAssignableReplicaMap().values().stream().flatMap(Collection::stream).map(
             replica -> new AssignableReplicaWithScore(replica, clusterModel,
-                overallClusterRemainingCapacityMap)).sorted().collect(Collectors.toList());
+                positiveEstimateClusterRemainCap)).sorted()
+            .collect(Collectors.toList());
 
     for (AssignableReplicaWithScore replicaWithScore : toBeAssignedReplicas) {
       AssignableReplica replica = replicaWithScore.getAssignableReplica();
@@ -139,7 +154,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
             int idleScore1 = busyInstances.contains(instanceName1) ? 0 : 1;
             int idleScore2 = busyInstances.contains(instanceName2) ? 0 : 1;
             return idleScore1 != idleScore2 ? (idleScore1 - idleScore2)
-                : - instanceName1.compareTo(instanceName2);
+                : -instanceName1.compareTo(instanceName2);
           } else {
             return scoreCompareResult;
           }
@@ -165,27 +180,15 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
         .collect(Collectors.toList());
   }
 
-  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
-    Map<String, Integer> utilizationMap = new HashMap<>();
-    for (AssignableNode node : nodes) {
-      for (String capacityKey : node.getMaxCapacity().keySet()) {
-        utilizationMap.compute(capacityKey,
-            (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey)
-                : v + node.getRemainingCapacity().get(capacityKey));
-      }
-    }
-    return utilizationMap;
-  }
-
   private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
     private final AssignableReplica _replica;
     private float _score = 0;
     private final boolean _isInBestPossibleAssignment;
     private final boolean _isInBaselineAssignment;
-    private final Integer  _replicaHash;
+    private final Integer _replicaHash;
 
     AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel,
-        Map<String, Integer> overallClusterRemainingCapacityMap) {
+        Map<String, Float> overallClusterRemainingCapacityMap) {
       _replica = replica;
       _isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment()
           .containsKey(replica.getResourceName());
@@ -195,22 +198,14 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
       computeScore(overallClusterRemainingCapacityMap);
     }
 
-    public void computeScore(Map<String, Integer> overallClusterRemainingCapMap) {
+    public void computeScore(Map<String, Float> positiveEstimateClusterRemainCap) {
       float score = 0;
       // score = SUM(weight * (resource_capacity/cluster_capacity) where weight = 1/(1-total_util%)
       // it could be be simplified to "resource_capacity/cluster_remainingCapacity".
       for (Map.Entry<String, Integer> resourceCapacity : _replica.getCapacity().entrySet()) {
-        if (resourceCapacity.getValue() == 0) {
-          continue;
-        }
-        score = (overallClusterRemainingCapMap.get(resourceCapacity.getKey()) == 0
-            || resourceCapacity.getValue() > (overallClusterRemainingCapMap
-            .get(resourceCapacity.getKey()))) ? Float.MAX_VALUE
-            : score + (float) resourceCapacity.getValue() / (overallClusterRemainingCapMap
-                .get(resourceCapacity.getKey()));
-        if (Float.compare(score, Float.MAX_VALUE) == 0) {
-          break;
-        }
+        String capacityKey = resourceCapacity.getKey();
+        score +=
+            (float) resourceCapacity.getValue() / positiveEstimateClusterRemainCap.get(capacityKey);
       }
       _score = score;
     }
@@ -219,7 +214,6 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
       return _replica;
     }
 
-
     @Override
     public String toString() {
       return _replica.toString();
@@ -275,6 +269,7 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
     }
   }
 
+
   /**
    * @param assignments A collection of resource replicas assignment.
    * @return A set of instance names that have at least one replica assigned in the input assignments.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
index 5bfd4d0..24916e9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterContext.java
@@ -53,6 +53,10 @@ public class ClusterContext {
   private final Map<String, ResourceAssignment> _baselineAssignment;
   // <ResourceName, ResourceAssignment contains the best possible assignment>
   private final Map<String, ResourceAssignment> _bestPossibleAssignment;
+  // Estimate remaining capacity after assignment. Used to compute score when sorting replicas.
+  private final Map<String, Integer> _estimateUtilizationMap;
+  // Cluster total capacity. Used to compute score when sorting replicas.
+  private final Map<String, Integer> _clusterCapacityMap;
 
   /**
    * Construct the cluster context based on the current instance status.
@@ -99,9 +103,13 @@ public class ClusterContext {
       // If no capacity is configured, we treat the cluster as fully utilized.
       _estimatedMaxUtilization = 1f;
       _estimatedTopStateMaxUtilization = 1f;
+      _estimateUtilizationMap = Collections.emptyMap();
+      _clusterCapacityMap = Collections.emptyMap();
     } else {
       _estimatedMaxUtilization = estimateMaxUtilization(totalCapacity, totalUsage);
       _estimatedTopStateMaxUtilization = estimateMaxUtilization(totalCapacity, totalTopStateUsage);
+      _estimateUtilizationMap = estimateUtilization(totalCapacity, totalUsage);
+      _clusterCapacityMap = Collections.unmodifiableMap(totalCapacity);
     }
     _estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
     _estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
@@ -142,6 +150,14 @@ public class ClusterContext {
     return _estimatedTopStateMaxUtilization;
   }
 
+  public Map<String, Integer> getEstimateUtilizationMap() {
+    return _estimateUtilizationMap;
+  }
+
+  public Map<String, Integer> getClusterCapacityMap() {
+    return _clusterCapacityMap;
+  }
+
   public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, String faultZoneId) {
     return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
         .getOrDefault(resourceName, Collections.emptySet());
@@ -167,7 +183,7 @@ public class ClusterContext {
     _assignmentForFaultZoneMap = assignmentForFaultZoneMap;
   }
 
-  private int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
+  private static int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
     // Use the floor to ensure evenness.
     // Note if we calculate estimation based on ceil, we might have some low usage participants.
     // For example, if the evaluation is between 1 and 2. While we use 2, many participants will be
@@ -177,7 +193,7 @@ public class ClusterContext {
     return (int) Math.floor((float) replicaCount / instanceCount);
   }
 
-  private float estimateMaxUtilization(Map<String, Integer> totalCapacity,
+  private static float estimateMaxUtilization(Map<String, Integer> totalCapacity,
       Map<String, Integer> totalUsage) {
     float estimatedMaxUsage = 0;
     for (String capacityKey : totalCapacity.keySet()) {
@@ -189,4 +205,16 @@ public class ClusterContext {
 
     return estimatedMaxUsage;
   }
+
+  private static Map<String, Integer> estimateUtilization(Map<String, Integer> totalCapacity,
+      Map<String, Integer> totalUsage) {
+    Map<String, Integer> estimateUtilization = new HashMap<>();
+    for (String capacityKey : totalCapacity.keySet()) {
+      int maxCapacity = totalCapacity.get(capacityKey);
+      int usage = totalUsage.getOrDefault(capacityKey, 0);
+      estimateUtilization.put(capacityKey, maxCapacity - usage);
+    }
+
+    return Collections.unmodifiableMap(estimateUtilization);
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
index 84aeb20..3ba92ef 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
@@ -84,7 +84,7 @@ public class TestConstraintBasedAlgorithm {
         }));
   }
 
-  // Add capacity related hard/soft constrain to test sorting algorithm in ConstraintBasedAlgorithm.
+  // Add capacity related hard/soft constraint to test sorting algorithm in ConstraintBasedAlgorithm.
   @Test
   public void testSortingByResourceCapacity() throws IOException, HelixRebalanceException {
     HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint();
@@ -98,4 +98,24 @@ public class TestConstraintBasedAlgorithm {
 
     Assert.assertFalse(optimalAssignment.hasAnyFailure());
   }
+
+  // Add neg test for error handling in ConstraintBasedAlgorithm replica sorting.
+  @Test
+  public void testSortingEarlyQuitLackCapacity() throws IOException, HelixRebalanceException {
+    HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint();
+    SoftConstraint soft1 = new MaxCapacityUsageInstanceConstraint();
+    SoftConstraint soft2 = new InstancePartitionsCountConstraint();
+    ConstraintBasedAlgorithm algorithm =
+        new ConstraintBasedAlgorithm(ImmutableList.of(nodeCapacityConstraint),
+            ImmutableMap.of(soft1, 1f, soft2, 1f));
+    ClusterModel clusterModel =
+        new ClusterModelTestHelper().getMultiNodeClusterModelNegativeSetup();
+    try {
+      OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
+    } catch (HelixRebalanceException ex) {
+      Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+      Assert.assertEquals(ex.getMessage(),
+          "The cluster does not have enough item1 capacity for all partitions.  Failure Type: FAILED_TO_CALCULATE");
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 7b5d63a..997232a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -223,7 +223,46 @@ public abstract class AbstractTestClusterModel {
     Map<String, ResourceConfig> configMap = testCache.getResourceConfigMap();
     configMap.put("Resource3", testResourceConfigResource3);
     when(testCache.getResourceConfigMap()).thenReturn(configMap);
-    return  testCache;
+    return testCache;
+  }
+
+  // Add another resource that has large capacity for negative test
+  // TODO: this function has a lot similar line as previous one. We should have a more
+  // generalized factory function instead.
+  protected ResourceControllerDataProvider setupClusterDataCacheForNoFitUtil() throws IOException {
+    _resourceNames.add("Resource4");
+    _partitionNames.add("Partition7");
+    _partitionNames.add("Partition8");
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+    CurrentState testCurrentStateResource4 = Mockito.mock(CurrentState.class);
+    Map<String, String> partitionStateMap4 = new HashMap<>();
+    partitionStateMap4.put(_partitionNames.get(4), "MASTER");
+    partitionStateMap4.put(_partitionNames.get(5), "SLAVE");
+    when(testCurrentStateResource4.getResourceName()).thenReturn(_resourceNames.get(2));
+    when(testCurrentStateResource4.getPartitionStateMap()).thenReturn(partitionStateMap4);
+    when(testCurrentStateResource4.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(testCurrentStateResource4.getState(_partitionNames.get(4))).thenReturn("MASTER");
+    when(testCurrentStateResource4.getState(_partitionNames.get(5))).thenReturn("SLAVE");
+    when(testCurrentStateResource4.getSessionId()).thenReturn(_sessionId);
+
+    Map<String, CurrentState> currentStatemap =
+        testCache.getCurrentState(_testInstanceId, _sessionId);
+    currentStatemap.put(_resourceNames.get(2), testCurrentStateResource4);
+    when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
+    when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap);
+
+    Map<String, Integer> capacityDataMapResource3 = new HashMap<>();
+    capacityDataMapResource3.put("item1", 90);
+    capacityDataMapResource3.put("item2", 9);
+    ResourceConfig testResourceConfigResource3 = new ResourceConfig("Resource4");
+    testResourceConfigResource3.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource3));
+    when(testCache.getResourceConfig("Resource4")).thenReturn(testResourceConfigResource3);
+    Map<String, ResourceConfig> configMap = testCache.getResourceConfigMap();
+    configMap.put("Resource4", testResourceConfigResource3);
+    when(testCache.getResourceConfigMap()).thenReturn(configMap);
+    return testCache;
   }
 
   /**
@@ -237,8 +276,7 @@ public abstract class AbstractTestClusterModel {
     for (CurrentState cs : currentStatemap.values()) {
       ResourceConfig resourceConfig = dataProvider.getResourceConfig(cs.getResourceName());
       // Construct one AssignableReplica for each partition in the current state.
-      cs.getPartitionStateMap().entrySet().stream()
-          .forEach(entry -> assignmentSet
+      cs.getPartitionStateMap().entrySet().stream().forEach(entry -> assignmentSet
               .add(new AssignableReplica(dataProvider.getClusterConfig(), resourceConfig,
                   entry.getKey(), entry.getValue(), entry.getValue().equals("MASTER") ? 1 : 2)));
     }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 02ab0f5..e13fb39 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -48,7 +48,16 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
 
   public ClusterModel getMultiNodeClusterModel() throws IOException {
     initialize();
-    ResourceControllerDataProvider testCache = setupClusterDataCacheForNearFullUtil();
+    return getClusterHelper(setupClusterDataCacheForNearFullUtil());
+  }
+
+  public ClusterModel getMultiNodeClusterModelNegativeSetup() throws IOException {
+    initialize();
+    return getClusterHelper(setupClusterDataCacheForNoFitUtil());
+  }
+
+  private ClusterModel getClusterHelper(ResourceControllerDataProvider testCache)
+      throws IOException {
     InstanceConfig testInstanceConfig1 = createMockInstanceConfig(TEST_INSTANCE_ID_1);
     InstanceConfig testInstanceConfig2 = createMockInstanceConfig(TEST_INSTANCE_ID_2);
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
@@ -59,7 +68,8 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
     Set<AssignableNode> assignableNodes = generateNodes(testCache);
 
     ClusterContext context =
-        new ClusterContext(assignableReplicas, assignableNodes, Collections.emptyMap(), Collections.emptyMap());
+        new ClusterContext(assignableReplicas, assignableNodes, Collections.emptyMap(),
+            Collections.emptyMap());
     return new ClusterModel(context, assignableReplicas, assignableNodes);
   }
 }