You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by "desaikomal (via GitHub)" <gi...@apache.org> on 2023/04/14 16:26:36 UTC

[GitHub] [helix] desaikomal commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

desaikomal commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166110025


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites

Review Comment:
   does this mean that these partitions are still in delayed-window? Or they are marked permanently down?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,157 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {

Review Comment:
   note: Since we are using partitionsMissingMinActiveReplicas, why are we iterating again?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<PartitionWithReplicaCount> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<PartitionWithReplicaCount>> getPartitionsNeedForRebalanceOverwrites(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> findPartitionsMissingMinActiveReplica(clusterData, entry.getValue())
+        ));
+  }
+
+  private static List<PartitionWithReplicaCount> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()
+        .parallelStream()
+        .filter(partition -> {
+          long enabledLivePlacementCounter = resourceAssignment.getReplicaMap(partition).keySet()
+              .stream()
+              .filter(enabledLiveInstances::contains)
+              .count();
+          return enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica);
+        })
+        .map(partition -> new PartitionWithReplicaCount(partition, minActiveReplica, numReplica))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica.
+   * @param clusterData Cluster data cache.
+   * @param resourceName name of the resource
+   * @param partitions Pre-computed list of partition missing minActiveReplica
+   * @param stateInstanceMap <partition, <state, instances set>>
+   * @param liveEnabledInstances A set of live and enabled instances
+   * @return A set of AssignableReplica
+   */
+  private static Set<AssignableReplica> findAssignableReplicaForResource(

Review Comment:
   how is this function different from: findToBeAssignedReplicasForMinActiveReplica



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,157 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<String> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<String>> findPartitionsMissingMinActiveReplica(

Review Comment:
   nit: can you please add comment ??



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<PartitionWithReplicaCount> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<PartitionWithReplicaCount>> getPartitionsNeedForRebalanceOverwrites(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> findPartitionsMissingMinActiveReplica(clusterData, entry.getValue())
+        ));
+  }
+
+  private static List<PartitionWithReplicaCount> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()

Review Comment:
   Note to myself: i need to understand this lambda processing better... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org