You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/04/19 17:26:11 UTC

[GitHub] [helix] jiajunwang commented on a change in pull request #1691: Improve the WAGED.ConstraintBasedAlgorithm sorting logic to prioritize replica with larger impact

jiajunwang commented on a change in pull request #1691:
URL: https://github.com/apache/helix/pull/1691#discussion_r616031829



##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -152,69 +172,136 @@ private double getAssignmentNormalizedScore(AssignableNode node, AssignableRepli
         .collect(Collectors.toList());
   }
 
-  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
-    List<AssignableReplica> orderedAssignableReplicas =
-        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
-            .collect(Collectors.toList());
-
-    Map<String, ResourceAssignment> bestPossibleAssignment =
-        clusterModel.getContext().getBestPossibleAssignment();
-    Map<String, ResourceAssignment> baselineAssignment =
-        clusterModel.getContext().getBaselineAssignment();
-
-    Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
-        Collectors.toMap(AssignableReplica::toString,
-            replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
-            (hash1, hash2) -> hash2));
-
-    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
-    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
-    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
-    // 3. Sort according to the resource/partition name.
-    orderedAssignableReplicas.sort((replica1, replica2) -> {
-      String resourceName1 = replica1.getResourceName();
-      String resourceName2 = replica2.getResourceName();
-      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
-          .containsKey(resourceName2)) {
-        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
-            .containsKey(resourceName2)) {
-          // If both assignment states have/not have the resource assignment the same,
-          // compare for additional dimensions.
-          int statePriority1 = replica1.getStatePriority();
-          int statePriority2 = replica2.getStatePriority();
-          if (statePriority1 == statePriority2) {
-            // If state priorities are the same, try to randomize the replicas order. Otherwise,
-            // the same replicas might always be moved in each rebalancing. This is because their
-            // placement calculating will always happen at the critical moment while the cluster is
-            // almost close to the expected utilization.
-            //
-            // Note that to ensure the algorithm is deterministic with the same inputs, do not use
-            // Random functions here. Use hashcode based on the cluster topology information to get
-            // a controlled randomized order is good enough.
-            Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
-            Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
-            if (!replicaHash1.equals(replicaHash2)) {
-              return replicaHash1.compareTo(replicaHash2);
-            } else {
-              // In case of hash collision, return order according to the name.
-              return replica1.toString().compareTo(replica2.toString());
-            }
-          } else {
-            // Note we shall prioritize the replica with a higher state priority,
-            // the smaller priority number means higher priority.
-            return statePriority1 - statePriority2;
-          }
-        } else {
-          // If the baseline assignment contains the assignment, prioritize the replica.
-          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+    Map<String, Integer> utilizationMap = new HashMap<>();
+    for (AssignableNode node : nodes) {
+      for (String capacityKey : node.getMaxCapacity().keySet()) {
+        utilizationMap.compute(capacityKey,
+            (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey)
+                : v + node.getRemainingCapacity().get(capacityKey));
+      }
+    }
+    return utilizationMap;
+  }
+
+  /**
+   * Update the overallClusterUtilMap with newly placed replica
+   */
+  private void updateOverallClusterRemainingCapacity(
+      Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) {
+    for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) {
+      Integer newRemainingCapacity =
+          overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage.getValue();
+      overallClusterRemainingCapacityMap.put(resourceUsage.getKey(), newRemainingCapacity);
+    }
+  }
+
+  private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
+    private final AssignableReplica _replica;
+    private float _score = 0;
+    private boolean _isInBestPossibleAssignment;

Review comment:
       nit, final?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -152,69 +172,136 @@ private double getAssignmentNormalizedScore(AssignableNode node, AssignableRepli
         .collect(Collectors.toList());
   }
 
-  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
-    List<AssignableReplica> orderedAssignableReplicas =
-        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
-            .collect(Collectors.toList());
-
-    Map<String, ResourceAssignment> bestPossibleAssignment =
-        clusterModel.getContext().getBestPossibleAssignment();
-    Map<String, ResourceAssignment> baselineAssignment =
-        clusterModel.getContext().getBaselineAssignment();
-
-    Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
-        Collectors.toMap(AssignableReplica::toString,
-            replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
-            (hash1, hash2) -> hash2));
-
-    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
-    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
-    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
-    // 3. Sort according to the resource/partition name.
-    orderedAssignableReplicas.sort((replica1, replica2) -> {
-      String resourceName1 = replica1.getResourceName();
-      String resourceName2 = replica2.getResourceName();
-      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
-          .containsKey(resourceName2)) {
-        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
-            .containsKey(resourceName2)) {
-          // If both assignment states have/not have the resource assignment the same,
-          // compare for additional dimensions.
-          int statePriority1 = replica1.getStatePriority();
-          int statePriority2 = replica2.getStatePriority();
-          if (statePriority1 == statePriority2) {
-            // If state priorities are the same, try to randomize the replicas order. Otherwise,
-            // the same replicas might always be moved in each rebalancing. This is because their
-            // placement calculating will always happen at the critical moment while the cluster is
-            // almost close to the expected utilization.
-            //
-            // Note that to ensure the algorithm is deterministic with the same inputs, do not use
-            // Random functions here. Use hashcode based on the cluster topology information to get
-            // a controlled randomized order is good enough.
-            Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
-            Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
-            if (!replicaHash1.equals(replicaHash2)) {
-              return replicaHash1.compareTo(replicaHash2);
-            } else {
-              // In case of hash collision, return order according to the name.
-              return replica1.toString().compareTo(replica2.toString());
-            }
-          } else {
-            // Note we shall prioritize the replica with a higher state priority,
-            // the smaller priority number means higher priority.
-            return statePriority1 - statePriority2;
-          }
-        } else {
-          // If the baseline assignment contains the assignment, prioritize the replica.
-          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+    Map<String, Integer> utilizationMap = new HashMap<>();
+    for (AssignableNode node : nodes) {
+      for (String capacityKey : node.getMaxCapacity().keySet()) {
+        utilizationMap.compute(capacityKey,
+            (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey)
+                : v + node.getRemainingCapacity().get(capacityKey));
+      }
+    }
+    return utilizationMap;
+  }
+
+  /**
+   * Update the overallClusterUtilMap with newly placed replica
+   */
+  private void updateOverallClusterRemainingCapacity(
+      Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) {
+    for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) {
+      Integer newRemainingCapacity =
+          overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage.getValue();
+      overallClusterRemainingCapacityMap.put(resourceUsage.getKey(), newRemainingCapacity);
+    }
+  }
+
+  private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
+    private final AssignableReplica _replica;
+    private float _score = 0;
+    private boolean _isInBestPossibleAssignment;
+    private boolean _isInBaselineAssignment;
+    private final Integer  _replicaHash;
+
+    AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel) {
+      _replica = replica;
+      _isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment()
+          .containsKey(replica.getResourceName());
+      _isInBaselineAssignment =
+          clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
+      _replicaHash = Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet());
+    }
+
+    public void computeScore(Map<String, Integer> overallClusterRemainingCapMap) {
+      float score = 0;
+      // score = SUM(weight * (resource_capacity/cluster_capacity) where weight = 1/(1-total_util%)
+      // it could be be simplified to "resource_capacity/cluster_remainingCapacity".
+      for (Map.Entry<String, Integer> resourceCapacity : _replica.getCapacity().entrySet()) {
+        score = resourceCapacity.getValue() == 0 ? score

Review comment:
       nit, IMO, checking resourceCapacity.getValue() == 0 and skip the loop makes the code cleaner. And it helps to avoid the unnecessary score value update if the compiler does not optimize it automatically.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -68,23 +71,40 @@ public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebala
     List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
     Set<String> busyInstances =
         getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
-    // Sort the replicas so the input is stable for the greedy algorithm.
-    // For the other algorithm implementation, this sorting could be unnecessary.
-    for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
+
+    // Compute overall utilization of the cluster. Capacity dimension -> total remaining capacity
+    Map<String, Integer> overallClusterRemainingCapacityMap =
+        computeOverallClusterRemainingCapacity(nodes);
+
+    // Create a wrapper for each AssignableReplica.
+    Set<AssignableReplicaWithScore> toBeAssignedReplicas =
+        clusterModel.getAssignableReplicaMap().values().stream()
+            .flatMap(replicas -> replicas.stream())
+            .map(replica -> new AssignableReplicaWithScore(replica, clusterModel))
+            .collect(Collectors.toSet());
+
+    while (!toBeAssignedReplicas.isEmpty()) {
+      AssignableReplicaWithScore replicaWithScore =
+          getNextAssignableReplica(toBeAssignedReplicas, overallClusterRemainingCapacityMap);

Review comment:
       Can we remove the selected replica in the getNextAssignableReplica() method? Maybe call it pollNextAssignableReplica()? Then I guess we can just use a linked list or some other more efficient data structure to make the loop faster.

##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -152,69 +172,136 @@ private double getAssignmentNormalizedScore(AssignableNode node, AssignableRepli
         .collect(Collectors.toList());
   }
 
-  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
-    List<AssignableReplica> orderedAssignableReplicas =
-        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
-            .collect(Collectors.toList());
-
-    Map<String, ResourceAssignment> bestPossibleAssignment =
-        clusterModel.getContext().getBestPossibleAssignment();
-    Map<String, ResourceAssignment> baselineAssignment =
-        clusterModel.getContext().getBaselineAssignment();
-
-    Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
-        Collectors.toMap(AssignableReplica::toString,
-            replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
-            (hash1, hash2) -> hash2));
-
-    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
-    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
-    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
-    // 3. Sort according to the resource/partition name.
-    orderedAssignableReplicas.sort((replica1, replica2) -> {
-      String resourceName1 = replica1.getResourceName();
-      String resourceName2 = replica2.getResourceName();
-      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
-          .containsKey(resourceName2)) {
-        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
-            .containsKey(resourceName2)) {
-          // If both assignment states have/not have the resource assignment the same,
-          // compare for additional dimensions.
-          int statePriority1 = replica1.getStatePriority();
-          int statePriority2 = replica2.getStatePriority();
-          if (statePriority1 == statePriority2) {
-            // If state priorities are the same, try to randomize the replicas order. Otherwise,
-            // the same replicas might always be moved in each rebalancing. This is because their
-            // placement calculating will always happen at the critical moment while the cluster is
-            // almost close to the expected utilization.
-            //
-            // Note that to ensure the algorithm is deterministic with the same inputs, do not use
-            // Random functions here. Use hashcode based on the cluster topology information to get
-            // a controlled randomized order is good enough.
-            Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
-            Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
-            if (!replicaHash1.equals(replicaHash2)) {
-              return replicaHash1.compareTo(replicaHash2);
-            } else {
-              // In case of hash collision, return order according to the name.
-              return replica1.toString().compareTo(replica2.toString());
-            }
-          } else {
-            // Note we shall prioritize the replica with a higher state priority,
-            // the smaller priority number means higher priority.
-            return statePriority1 - statePriority2;
-          }
-        } else {
-          // If the baseline assignment contains the assignment, prioritize the replica.
-          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+    Map<String, Integer> utilizationMap = new HashMap<>();
+    for (AssignableNode node : nodes) {
+      for (String capacityKey : node.getMaxCapacity().keySet()) {
+        utilizationMap.compute(capacityKey,
+            (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey)
+                : v + node.getRemainingCapacity().get(capacityKey));
+      }
+    }
+    return utilizationMap;
+  }
+
+  /**
+   * Update the overallClusterUtilMap with newly placed replica
+   */
+  private void updateOverallClusterRemainingCapacity(
+      Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) {
+    for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) {
+      Integer newRemainingCapacity =
+          overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage.getValue();
+      overallClusterRemainingCapacityMap.put(resourceUsage.getKey(), newRemainingCapacity);
+    }
+  }
+
+  private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
+    private final AssignableReplica _replica;
+    private float _score = 0;
+    private boolean _isInBestPossibleAssignment;
+    private boolean _isInBaselineAssignment;
+    private final Integer  _replicaHash;
+
+    AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel) {

Review comment:
       Just extend AssignableReplica?

##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -42,6 +42,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.Float.compare;

Review comment:
       nit, better directly call Float.compare in the code. Static import here is not quite desired.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org