You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2021/04/12 21:38:41 UTC

[GitHub] [helix] NealSun96 commented on a change in pull request #1691: Improve the WAGED.ConstraintBasedAlgorithm storing logic to prioritize replica with larger impact

NealSun96 commented on a change in pull request #1691:
URL: https://github.com/apache/helix/pull/1691#discussion_r611948370



##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -152,69 +169,135 @@ private double getAssignmentNormalizedScore(AssignableNode node, AssignableRepli
         .collect(Collectors.toList());
   }
 
-  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
-    List<AssignableReplica> orderedAssignableReplicas =
-        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
-            .collect(Collectors.toList());
-
-    Map<String, ResourceAssignment> bestPossibleAssignment =
-        clusterModel.getContext().getBestPossibleAssignment();
-    Map<String, ResourceAssignment> baselineAssignment =
-        clusterModel.getContext().getBaselineAssignment();
-
-    Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
-        Collectors.toMap(AssignableReplica::toString,
-            replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
-            (hash1, hash2) -> hash2));
-
-    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
-    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
-    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
-    // 3. Sort according to the resource/partition name.
-    orderedAssignableReplicas.sort((replica1, replica2) -> {
-      String resourceName1 = replica1.getResourceName();
-      String resourceName2 = replica2.getResourceName();
-      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
-          .containsKey(resourceName2)) {
-        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
-            .containsKey(resourceName2)) {
-          // If both assignment states have/not have the resource assignment the same,
-          // compare for additional dimensions.
-          int statePriority1 = replica1.getStatePriority();
-          int statePriority2 = replica2.getStatePriority();
-          if (statePriority1 == statePriority2) {
-            // If state priorities are the same, try to randomize the replicas order. Otherwise,
-            // the same replicas might always be moved in each rebalancing. This is because their
-            // placement calculating will always happen at the critical moment while the cluster is
-            // almost close to the expected utilization.
-            //
-            // Note that to ensure the algorithm is deterministic with the same inputs, do not use
-            // Random functions here. Use hashcode based on the cluster topology information to get
-            // a controlled randomized order is good enough.
-            Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
-            Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
-            if (!replicaHash1.equals(replicaHash2)) {
-              return replicaHash1.compareTo(replicaHash2);
-            } else {
-              // In case of hash collision, return order according to the name.
-              return replica1.toString().compareTo(replica2.toString());
-            }
-          } else {
-            // Note we shall prioritize the replica with a higher state priority,
-            // the smaller priority number means higher priority.
-            return statePriority1 - statePriority2;
-          }
+  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+    Map<String, Integer> utilizationMap = new HashMap<>();
+    for (AssignableNode node : nodes) {
+      for (String capacityKey : node.getMaxCapacity().keySet()) {
+        if (utilizationMap.containsKey(capacityKey)) {

Review comment:
       Consider shortening it to `utilizationMap.compute(capacityKey, (k, v) -> v == null ? node.getRemainingCapacity().get(capacityKey) : v + node.getRemainingCapacity().get(capacityKey))`? 

##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -152,69 +169,135 @@ private double getAssignmentNormalizedScore(AssignableNode node, AssignableRepli
         .collect(Collectors.toList());
   }
 
-  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
-    List<AssignableReplica> orderedAssignableReplicas =
-        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
-            .collect(Collectors.toList());
-
-    Map<String, ResourceAssignment> bestPossibleAssignment =
-        clusterModel.getContext().getBestPossibleAssignment();
-    Map<String, ResourceAssignment> baselineAssignment =
-        clusterModel.getContext().getBaselineAssignment();
-
-    Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
-        Collectors.toMap(AssignableReplica::toString,
-            replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
-            (hash1, hash2) -> hash2));
-
-    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
-    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
-    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
-    // 3. Sort according to the resource/partition name.
-    orderedAssignableReplicas.sort((replica1, replica2) -> {
-      String resourceName1 = replica1.getResourceName();
-      String resourceName2 = replica2.getResourceName();
-      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
-          .containsKey(resourceName2)) {
-        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
-            .containsKey(resourceName2)) {
-          // If both assignment states have/not have the resource assignment the same,
-          // compare for additional dimensions.
-          int statePriority1 = replica1.getStatePriority();
-          int statePriority2 = replica2.getStatePriority();
-          if (statePriority1 == statePriority2) {
-            // If state priorities are the same, try to randomize the replicas order. Otherwise,
-            // the same replicas might always be moved in each rebalancing. This is because their
-            // placement calculating will always happen at the critical moment while the cluster is
-            // almost close to the expected utilization.
-            //
-            // Note that to ensure the algorithm is deterministic with the same inputs, do not use
-            // Random functions here. Use hashcode based on the cluster topology information to get
-            // a controlled randomized order is good enough.
-            Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
-            Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
-            if (!replicaHash1.equals(replicaHash2)) {
-              return replicaHash1.compareTo(replicaHash2);
-            } else {
-              // In case of hash collision, return order according to the name.
-              return replica1.toString().compareTo(replica2.toString());
-            }
-          } else {
-            // Note we shall prioritize the replica with a higher state priority,
-            // the smaller priority number means higher priority.
-            return statePriority1 - statePriority2;
-          }
+  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+    Map<String, Integer> utilizationMap = new HashMap<>();
+    for (AssignableNode node : nodes) {
+      for (String capacityKey : node.getMaxCapacity().keySet()) {
+        if (utilizationMap.containsKey(capacityKey)) {
+          Integer newUtil =
+              utilizationMap.get(capacityKey) + node.getRemainingCapacity().get(capacityKey);
+          // update util
+          utilizationMap.put(capacityKey, newUtil);
         } else {
-          // If the baseline assignment contains the assignment, prioritize the replica.
-          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+          Integer newUtil = node.getRemainingCapacity().get(capacityKey);
+          utilizationMap.put(capacityKey, newUtil);
         }
+      }
+    }
+    return utilizationMap;
+  }
+
+  /**
+   * Update the overallClusterUtilMap with newly placed replica
+   */
+  private void updateOverallClusterRemainingCapacity(
+      Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) {
+    for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) {
+      Integer newRemainingCapacity =
+          overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage.getValue();
+      overallClusterRemainingCapacityMap.put(resourceUsage.getKey(), newRemainingCapacity);
+    }
+  }
+
+  private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
+    private final AssignableReplica _replica;
+    private int _score = 0;
+    private boolean _isInBestPossibleAssignment;
+    private boolean _isInBaselineAssignment;
+    private final Integer  _replicaHash;
+
+    AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel) {
+      _replica = replica;
+      _isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment()
+          .containsKey(replica.getResourceName());
+      _isInBaselineAssignment =
+          clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
+      _replicaHash = Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet());
+    }
+
+    public void reComputeScore(Map<String, Integer> overallClusterRemainingCapMap) {
+      int score = 0;
+      // score = SUM(weight * (resource_capacity/cluster_capacity) where weight = 1/(1-total_util%)
+      // it could be be simplified to "resource_capacity/cluster_remainingCapacity".
+      for (Map.Entry<String, Integer> resourceCapacity : _replica.getCapacity().entrySet()) {
+        score += resourceCapacity.getValue() * 100 / (
+            overallClusterRemainingCapMap.get(resourceCapacity.getKey())
+                + 0.01f /*avoid divide by 0*/);

Review comment:
       Does 0.01 really make sense, in case when a certain capacity is measured in decimals, and 0.01 becomes a significant portion (total cluster capacity = 0.01 TB, as an extreme example, then adding 0.01 is a 100% increase to the remaining cap). 
   
   On the other hand, why are we handling the case for "remaining cap = 0"? When that happens, this assignment is already doomed, right? I don't think it's possible to place this replica any more. 

##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -68,23 +68,40 @@ public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebala
     List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
     Set<String> busyInstances =
         getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values());
-    // Sort the replicas so the input is stable for the greedy algorithm.
-    // For the other algorithm implementation, this sorting could be unnecessary.
-    for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
+
+    // Compute overall utilization of the cluster. Capacity dimension -> total remaining capacity
+    Map<String, Integer> overallClusterRemainingCapacityMap =
+        computeOverallClusterRemainingCapacity(nodes);
+
+    // Create a wrapper for each AssignableReplica.
+    Set<AssignableReplicaWithScore> toBeAssignedReplicas =
+        clusterModel.getAssignableReplicaMap().values().stream()
+            .flatMap(replicas -> replicas.stream())
+            .map(replica -> new AssignableReplicaWithScore(replica, clusterModel))
+            .collect(Collectors.toSet());
+
+    AssignableReplicaWithScore replica =
+        getNextAssignableReplica(toBeAssignedReplicas,  overallClusterRemainingCapacityMap);
+    while (replica != null) {
+      toBeAssignedReplicas.remove(replica);
+      AssignableReplica rawReplica = replica.getRawAssignableReplica();

Review comment:
       (This is related to my comment on function naming for `AssignableReplicaWithScore`. )
   `rawReplica` here is really just `replica`, and the `replica` before should be `replicaWithScore`, right? I believe that's more clear as the names reflect their classes, and `getRawAssignableReplica()` could just become `getAssignableReplica()`. 
   

##########
File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
##########
@@ -152,69 +169,135 @@ private double getAssignmentNormalizedScore(AssignableNode node, AssignableRepli
         .collect(Collectors.toList());
   }
 
-  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
-    Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
-    List<AssignableReplica> orderedAssignableReplicas =
-        replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
-            .collect(Collectors.toList());
-
-    Map<String, ResourceAssignment> bestPossibleAssignment =
-        clusterModel.getContext().getBestPossibleAssignment();
-    Map<String, ResourceAssignment> baselineAssignment =
-        clusterModel.getContext().getBaselineAssignment();
-
-    Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect(
-        Collectors.toMap(AssignableReplica::toString,
-            replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()),
-            (hash1, hash2) -> hash2));
-
-    // 1. Sort according if the assignment exists in the best possible and/or baseline assignment
-    // 2. Sort according to the state priority. Note that prioritizing the top state is required.
-    // Or the greedy algorithm will unnecessarily shuffle the states between replicas.
-    // 3. Sort according to the resource/partition name.
-    orderedAssignableReplicas.sort((replica1, replica2) -> {
-      String resourceName1 = replica1.getResourceName();
-      String resourceName2 = replica2.getResourceName();
-      if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
-          .containsKey(resourceName2)) {
-        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
-            .containsKey(resourceName2)) {
-          // If both assignment states have/not have the resource assignment the same,
-          // compare for additional dimensions.
-          int statePriority1 = replica1.getStatePriority();
-          int statePriority2 = replica2.getStatePriority();
-          if (statePriority1 == statePriority2) {
-            // If state priorities are the same, try to randomize the replicas order. Otherwise,
-            // the same replicas might always be moved in each rebalancing. This is because their
-            // placement calculating will always happen at the critical moment while the cluster is
-            // almost close to the expected utilization.
-            //
-            // Note that to ensure the algorithm is deterministic with the same inputs, do not use
-            // Random functions here. Use hashcode based on the cluster topology information to get
-            // a controlled randomized order is good enough.
-            Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString());
-            Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString());
-            if (!replicaHash1.equals(replicaHash2)) {
-              return replicaHash1.compareTo(replicaHash2);
-            } else {
-              // In case of hash collision, return order according to the name.
-              return replica1.toString().compareTo(replica2.toString());
-            }
-          } else {
-            // Note we shall prioritize the replica with a higher state priority,
-            // the smaller priority number means higher priority.
-            return statePriority1 - statePriority2;
-          }
+  private Map<String, Integer> computeOverallClusterRemainingCapacity(List<AssignableNode> nodes) {
+    Map<String, Integer> utilizationMap = new HashMap<>();
+    for (AssignableNode node : nodes) {
+      for (String capacityKey : node.getMaxCapacity().keySet()) {
+        if (utilizationMap.containsKey(capacityKey)) {
+          Integer newUtil =
+              utilizationMap.get(capacityKey) + node.getRemainingCapacity().get(capacityKey);
+          // update util
+          utilizationMap.put(capacityKey, newUtil);
         } else {
-          // If the baseline assignment contains the assignment, prioritize the replica.
-          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+          Integer newUtil = node.getRemainingCapacity().get(capacityKey);
+          utilizationMap.put(capacityKey, newUtil);
         }
+      }
+    }
+    return utilizationMap;
+  }
+
+  /**
+   * Update the overallClusterUtilMap with newly placed replica
+   */
+  private void updateOverallClusterRemainingCapacity(
+      Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) {
+    for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) {
+      Integer newRemainingCapacity =
+          overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage.getValue();
+      overallClusterRemainingCapacityMap.put(resourceUsage.getKey(), newRemainingCapacity);
+    }
+  }
+
+  private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {

Review comment:
       Two comments about function naming here: 
   
   1. We could just say "computeScore" instead of "reComputeScore". We're re-computing by calling the compute function again. 
   2. We could just say "getAssignableReplica" instead of "getRawAssignableReplica". 
   
   Happy to discuss. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org