You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2021/04/24 04:42:35 UTC

[helix] branch wagedImprove updated: Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact (#1691)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedImprove
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedImprove by this push:
     new 47b30a9  Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact (#1691)
47b30a9 is described below

commit 47b30a9f13534aa30edbced88ecaa30192b10d5d
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Fri Apr 23 21:42:23 2021 -0700

    Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact (#1691)
    
    Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact.
---
 .../constraints/ConstraintBasedAlgorithm.java      | 222 ++++++++++++++-------
 .../constraints/TestConstraintBasedAlgorithm.java  |  15 ++
 .../waged/model/AbstractTestClusterModel.java      |  42 ++++
 .../waged/model/ClusterModelTestHelper.java        |   2 +-
 .../WagedRebalancer/TestWagedRebalance.java        |   2 +-
 5 files changed, 213 insertions(+), 70 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 2f6c9cd..89730c6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.ResourceAssignment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * The algorithm is based on a given set of constraints
  * - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot
@@ -68,23 +69,38 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
     List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
     Set<String> busyInstances =
         getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
-    // Sort the replicas so the input is stable for the greedy algorithm.
-    // For the other algorithm implementation, this sorting could be unnecessary.
-    for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
+
+    // Compute overall utilization of the cluster. Capacity dimension -> total remaining capacity
+    Map<String, Integer> overallClusterRemainingCapacityMap =
+        computeOverallClusterRemainingCapacity(nodes);
+
+    // Create a wrapper for each AssignableReplica.
+    Set<AssignableReplicaWithScore> toBeAssignedReplicas =
+        clusterModel.getAssignableReplicaMap().values().stream()
+            .flatMap(replicas -> replicas.stream())
+            .map(replica -> new AssignableReplicaWithScore(replica, clusterModel))
+            .collect(Collectors.toSet());
+
+    while (!toBeAssignedReplicas.isEmpty()) {
+      AssignableReplica replica =
+          getNextAssignableReplica(toBeAssignedReplicas, overallClusterRemainingCapacityMap);
       Optional<AssignableNode> maybeBestNode =
           getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances,
               optimalAssignment);
       // stop immediately if any replica cannot find best assignable node
-      if (optimalAssignment.hasAnyFailure()) {
-        String errorMessage = String
-            .format("Unable to find any available candidate node for partition %s; Fail reasons: %s",
+      if (!maybeBestNode.isPresent() || optimalAssignment.hasAnyFailure()) {
+        String errorMessage = String.format(
+            "Unable to find any available candidate node for partition %s; Fail reasons: %s",
             replica.getPartitionName(), optimalAssignment.getFailures());
         throw new HelixRebalanceException(errorMessage,
             HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       }
-      maybeBestNode.ifPresent(node -> clusterModel
+      AssignableNode bestNode = maybeBestNode.get();
+      // Assign the replica and update the cluster model.
+      clusterModel
           .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
-              node.getInstanceName()));
+              bestNode.getInstanceName());
+      updateOverallClusterRemainingCapacity(overallClusterRemainingCapacityMap, replica);
     }
     optimalAssignment.updateAssignments(clusterModel);
     return optimalAssignment;
@@ -152,69 +168,139 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
         .collect(Collectors.toList());
   }
 
-  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
-    List<AssignableReplica> orderedAssignableReplicas =
-        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
-            .collect(Collectors.toList());
-
-    Map<String, ResourceAssignment> bestPossibleAssignment =
-        clusterModel.getContext().getBestPossibleAssignment();
-    Map<String, ResourceAssignment> baselineAssignment =
-        clusterModel.getContext().getBaselineAssignment();
-
-    Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
-        Collectors.toMap(AssignableReplica::toString,
-            replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
-            (hash1, hash2) -> hash2));
-
-    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
-    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
-    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
-    // 3. Sort according to the resource/partition name.
-    orderedAssignableReplicas.sort((replica1, replica2) -> {
-      String resourceName1 = replica1.getResourceName();
-      String resourceName2 = replica2.getResourceName();
-      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
-          .containsKey(resourceName2)) {
-        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
-            .containsKey(resourceName2)) {
-          // If both assignment states have/not have the resource assignment the same,
-          // compare for additional dimensions.
-          int statePriority1 = replica1.getStatePriority();
-          int statePriority2 = replica2.getStatePriority();
-          if (statePriority1 == statePriority2) {
-            // If state priorities are the same, try to randomize the replicas order. Otherwise,
-            // the same replicas might always be moved in each rebalancing. This is because their
-            // placement calculating will always happen at the critical moment while the cluster is
-            // almost close to the expected utilization.
-            //
-            // Note that to ensure the algorithm is deterministic with the same inputs, do not use
-            // Random functions here. Use hashcode based on the cluster topology information to get
-            // a controlled randomized order is good enough.
-            Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
-            Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
-            if (!replicaHash1.equals(replicaHash2)) {
-              return replicaHash1.compareTo(replicaHash2);
-            } else {
-              // In case of hash collision, return order according to the name.
-              return replica1.toString().compareTo(replica2.toString());
-            }
-          } else {
-            // Note we shall prioritize the replica with a higher state priority,
-            // the smaller priority number means higher priority.
-            return statePriority1 - statePriority2;
-          }
-        } else {
-          // If the baseline assignment contains the assignment, prioritize the replica.
-          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+    Map<String, Integer> utilizationMap = new HashMap<>();
+    for (AssignableNode node : nodes) {
+      for (String capacityKey : node.getMaxCapacity().keySet()) {
+        utilizationMap.compute(capacityKey,
+            (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey)
+                : v + node.getRemainingCapacity().get(capacityKey));
+      }
+    }
+    return utilizationMap;
+  }
+
+  /**
+   * Update the overallClusterUtilMap with newly placed replica
+   */
+  private void updateOverallClusterRemainingCapacity(
+      Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) {
+    for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) {
+      overallClusterRemainingCapacityMap.put(resourceUsage.getKey(),
+          overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage
+              .getValue());
+    }
+  }
+
+  private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
+    private final AssignableReplica _replica;
+    private float _score = 0;
+    private final boolean _isInBestPossibleAssignment;
+    private final boolean _isInBaselineAssignment;
+    private final Integer  _replicaHash;
+
+    AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel) {
+      _replica = replica;
+      _isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment()
+          .containsKey(replica.getResourceName());
+      _isInBaselineAssignment =
+          clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
+      _replicaHash = Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet());
+    }
+
+    public void computeScore(Map<String, Integer> overallClusterRemainingCapMap) {
+      float score = 0;
+      // score = SUM(weight * (resource_capacity/cluster_capacity) where weight = 1/(1-total_util%)
+      // it could be be simplified to "resource_capacity/cluster_remainingCapacity".
+      for (Map.Entry<String, Integer> resourceCapacity : _replica.getCapacity().entrySet()) {
+        if (resourceCapacity.getValue() == 0) {
+          continue;
         }
+        score = (overallClusterRemainingCapMap.get(resourceCapacity.getKey()) == 0
+            || resourceCapacity.getValue() > (overallClusterRemainingCapMap
+            .get(resourceCapacity.getKey()))) ? Float.MAX_VALUE
+            : score + (float) resourceCapacity.getValue() / (overallClusterRemainingCapMap
+                .get(resourceCapacity.getKey()));
+        if (Float.compare(score, Float.MAX_VALUE) == 0) {
+          break;
+        }
+      }
+      _score = score;
+    }
+
+    public AssignableReplica getAssignableReplica() {
+      return _replica;
+    }
+
+
+    @Override
+    public String toString() {
+      return _replica.toString();
+    }
+
+    @Override
+    public int compareTo(AssignableReplicaWithScore replica2) {
+      // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
+      if (_isInBestPossibleAssignment != replica2._isInBestPossibleAssignment) {
+        // If the best possible assignment contains only one replica's assignment,
+        // prioritize the replica.
+        return _isInBestPossibleAssignment ? -1 : 1;
+      }
+
+      if (_isInBaselineAssignment != replica2._isInBaselineAssignment) {
+        // If the baseline assignment contains only one replica's assignment, prioritize the replica.
+        return _isInBaselineAssignment ? -1 : 1;
+      }
+
+      // 2. Sort according to the state priority. Or the greedy algorithm will unnecessarily shuffle
+      // the states between replicas.
+      int statePriority1 = _replica.getStatePriority();
+      int statePriority2 = replica2._replica.getStatePriority();
+      if (statePriority1 != statePriority2) {
+        // Note we shall prioritize the replica with a higher state priority,
+        // the smaller priority number means higher priority.
+        return statePriority1 - statePriority2;
+      }
+
+      // 3. Sort according to the replica impact based on the weight.
+      // So the greedy algorithm will place the replicas with larger impact first.
+      int result = Float.compare(replica2._score, _score);
+      if (result != 0) {
+        return result;
+      }
+
+      // 4. Sort according to the resource/partition name.
+      // If none of the above conditions is making a difference, try to randomize the replicas
+      // order.
+      // Otherwise, the same replicas might always be moved in each rebalancing. This is because
+      // their placement calculating will always happen at the critical moment while the cluster is
+      // almost close to the expected utilization.
+      //
+      // Note that to ensure the algorithm is deterministic with the same inputs, do not use
+      // Random functions here. Use hashcode based on the cluster topology information to get
+      // a controlled randomized order is good enough.
+      if (!_replicaHash.equals(replica2._replicaHash)) {
+        return _replicaHash.compareTo(replica2._replicaHash);
       } else {
-        // If the best possible assignment contains the assignment, prioritize the replica.
-        return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1;
+        // In case of hash collision, return order according to the name.
+        return _replica.toString().compareTo(replica2.toString());
       }
-    });
-    return orderedAssignableReplicas;
+    }
+  }
+
+  private AssignableReplica getNextAssignableReplica(
+      Set<AssignableReplicaWithScore> allReplica,
+      Map<String, Integer> overallClusterRemainingCapMap) {
+    AssignableReplicaWithScore nextAssinableReplica = null;
+    // Compare every replica with current candidate, update candidate if needed
+    for (AssignableReplicaWithScore replica : allReplica) {
+      replica.computeScore(overallClusterRemainingCapMap);
+      if (nextAssinableReplica == null || replica.compareTo(nextAssinableReplica) < 0) {
+        nextAssinableReplica = replica;
+      }
+    }
+    allReplica.remove(nextAssinableReplica);
+    return nextAssinableReplica.getAssignableReplica();
   }
 
   /**
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
index 3954407..84aeb20 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java
@@ -83,4 +83,19 @@ public class TestConstraintBasedAlgorithm {
               .containsKey(ClusterModelTestHelper.TEST_INSTANCE_ID_1));
         }));
   }
+
+  // Add capacity related hard/soft constrain to test sorting algorithm in ConstraintBasedAlgorithm.
+  @Test
+  public void testSortingByResourceCapacity() throws IOException, HelixRebalanceException {
+    HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint();
+    SoftConstraint soft1 = new MaxCapacityUsageInstanceConstraint();
+    SoftConstraint soft2 = new InstancePartitionsCountConstraint();
+    ConstraintBasedAlgorithm algorithm =
+        new ConstraintBasedAlgorithm(ImmutableList.of(nodeCapacityConstraint),
+            ImmutableMap.of(soft1, 1f, soft2, 1f));
+    ClusterModel clusterModel = new ClusterModelTestHelper().getMultiNodeClusterModel();
+    OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
+
+    Assert.assertFalse(optimalAssignment.hasAnyFailure());
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index e3b346d..7b5d63a 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -164,6 +164,7 @@ public abstract class AbstractTestClusterModel {
     testResourceConfigResource1.setPartitionCapacityMap(
         Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource1));
     when(testCache.getResourceConfig("Resource1")).thenReturn(testResourceConfigResource1);
+
     Map<String, Integer> capacityDataMapResource2 = new HashMap<>();
     capacityDataMapResource2.put("item1", 5);
     capacityDataMapResource2.put("item2", 10);
@@ -184,6 +185,47 @@ public abstract class AbstractTestClusterModel {
     return testCache;
   }
 
+  // Add another resource. When compute, the two smaller resources' Master partition should be
+  // assigned to one instance and the relatively larger one's Master partition should be assigned to
+  // another.
+  // The sorting algorithm in ConstraintBasedAlgorithm should garnette these 2 smaller resources
+  // are placed after the larger one.
+  // This is the only way to accommodate all 6 partitions.
+  protected ResourceControllerDataProvider setupClusterDataCacheForNearFullUtil() throws IOException {
+    _resourceNames.add("Resource3");
+    _partitionNames.add("Partition5");
+    _partitionNames.add("Partition6");
+    ResourceControllerDataProvider testCache = setupClusterDataCache();
+
+    CurrentState testCurrentStateResource3 = Mockito.mock(CurrentState.class);
+    Map<String, String> partitionStateMap3 = new HashMap<>();
+    partitionStateMap3.put(_partitionNames.get(4), "MASTER");
+    partitionStateMap3.put(_partitionNames.get(5), "SLAVE");
+    when(testCurrentStateResource3.getResourceName()).thenReturn(_resourceNames.get(2));
+    when(testCurrentStateResource3.getPartitionStateMap()).thenReturn(partitionStateMap3);
+    when(testCurrentStateResource3.getStateModelDefRef()).thenReturn("MasterSlave");
+    when(testCurrentStateResource3.getState(_partitionNames.get(4))).thenReturn("MASTER");
+    when(testCurrentStateResource3.getState(_partitionNames.get(5))).thenReturn("SLAVE");
+    when(testCurrentStateResource3.getSessionId()).thenReturn(_sessionId);
+
+    Map<String, CurrentState> currentStatemap = testCache.getCurrentState(_testInstanceId, _sessionId);
+    currentStatemap.put(_resourceNames.get(2), testCurrentStateResource3);
+    when(testCache.getCurrentState(_testInstanceId, _sessionId)).thenReturn(currentStatemap);
+    when(testCache.getCurrentState(_testInstanceId, _sessionId, false)).thenReturn(currentStatemap);
+
+    Map<String, Integer> capacityDataMapResource3 = new HashMap<>();
+    capacityDataMapResource3.put("item1", 9);
+    capacityDataMapResource3.put("item2", 17);
+    ResourceConfig testResourceConfigResource3 = new ResourceConfig("Resource3");
+    testResourceConfigResource3.setPartitionCapacityMap(
+        Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource3));
+    when(testCache.getResourceConfig("Resource3")).thenReturn(testResourceConfigResource3);
+    Map<String, ResourceConfig> configMap = testCache.getResourceConfigMap();
+    configMap.put("Resource3", testResourceConfigResource3);
+    when(testCache.getResourceConfigMap()).thenReturn(configMap);
+    return  testCache;
+  }
+
   /**
    * Generate the replica objects according to the provider information.
    */
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
index 1c1687e..02ab0f5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelTestHelper.java
@@ -48,7 +48,7 @@ public class ClusterModelTestHelper extends AbstractTestClusterModel {
 
   public ClusterModel getMultiNodeClusterModel() throws IOException {
     initialize();
-    ResourceControllerDataProvider testCache = setupClusterDataCache();
+    ResourceControllerDataProvider testCache = setupClusterDataCacheForNearFullUtil();
     InstanceConfig testInstanceConfig1 = createMockInstanceConfig(TEST_INSTANCE_ID_1);
     InstanceConfig testInstanceConfig2 = createMockInstanceConfig(TEST_INSTANCE_ID_2);
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index bba94fc..bb58d86 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -732,7 +732,6 @@ public class TestWagedRebalance extends ZkTestBase {
     for (String db : _allDBs) {
       _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
     }
-    _allDBs.clear();
     // waiting for all DB be dropped.
     ZkHelixClusterVerifier _clusterVerifier =
         new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
@@ -741,6 +740,7 @@ public class TestWagedRebalance extends ZkTestBase {
             .build();
     try {
       Assert.assertTrue(_clusterVerifier.verifyByPolling());
+      _allDBs.clear();
     } finally {
       _clusterVerifier.close();
     }