You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by "qqu0127 (via GitHub)" <gi...@apache.org> on 2023/04/13 20:59:50 UTC

[GitHub] [helix] qqu0127 opened a new pull request, #2447: WAGED rebalance overwrite redesign -- part 2

qqu0127 opened a new pull request, #2447:
URL: https://github.com/apache/helix/pull/2447

   ### Issues
   
   - [X] My PR addresses the following Helix issues and references them in the PR description:
   
   Resolves https://github.com/apache/helix/issues/2441
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI changes:
   
   Depends on https://github.com/apache/helix/pull/2444
   This PR Integreates the redesigned minActiveReplica handling logic into WAGED rebalancer.
   
   Description TBD
   
   ### Tests
   
   - [ ] The following tests are written for this issue:
   
   Test TBD
   
   ### Changes that Break Backward Compatibility (Optional)
   
   - My PR contains changes that break backward compatibility or previous assumptions for certain methods or API. They include:
   
   (Consider including all behavior changes for public methods or API. Also include these changes in merge description so that other developers are aware of these changes. This allows them to make relevant code changes in feature branches accounting for the new method/API behavior.)
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1175802180


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -432,6 +481,17 @@ protected Map<String, ResourceAssignment> emergencyRebalance(
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+
+    // Step 4: handle delayed rebalance minActiveReplica
+    // Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
+    // and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
+    // once the node comes back of remain offline after the delayed window.
+    Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
+    if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {

Review Comment:
   One correction, this is already in the emergency pipeline, it's how we should be calling partial, sync or async. The `handleDelayedRebalanceMinActiveReplica` is always called, but it has to be following the right order (after partial if sync)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "desaikomal (via GitHub)" <gi...@apache.org>.
desaikomal commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1175648969


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -432,6 +481,17 @@ protected Map<String, ResourceAssignment> emergencyRebalance(
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+
+    // Step 4: handle delayed rebalance minActiveReplica
+    // Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
+    // and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
+    // once the node comes back of remain offline after the delayed window.
+    Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
+    if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {

Review Comment:
   Shouldnt we be calling emergency pipeline regardless of partialRebalancRunner ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on PR #2447:
URL: https://github.com/apache/helix/pull/2447#issuecomment-1521959796

   This PR is ready to merge, approved by @desaikomal and @junkaixue 
   
   Commit message:
   Integrates the redesigned minActiveReplica handling logic into WAGED rebalancer
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173173685


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -385,6 +375,63 @@ protected List<HelixRebalanceException.Type> failureTypesToPropagate() {
     return FAILURE_TYPES_TO_PROPAGATE;
   }
 
+  /**
+   * Some partition may fail to meet minActiveReplica due to delayed rebalance, because some instances are offline yet
+   * active. In this case, additional replicas have to be brought up -- until either the instance gets back, or timeout,
+   * at which we have a more permanent resolution.
+   * The term "overwrite" is inherited from historical approach, however, it's no longer technically an overwrite.
+   * It's a formal rebalance process that goes through the algorithm and all constraints.
+   * @param clusterData Cluster data cache
+   * @param resourceMap The map of resource to calculate
+   * @param activeNodes All active nodes (live nodes plus offline-yet-active nodes) while considering cluster's
+   *                    delayed rebalance config
+   * @param currentResourceAssignment The current resource assignment or the best possible assignment computed from last
+   *                           emergency rebalance.
+   * @param algorithm The rebalance algorithm
+   * @return The resource assignment with delayed rebalance minActiveReplica
+   */
+  private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap,
+      Set<String> activeNodes,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+    // the "real" live nodes at the time
+    final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
+      // no need for additional process, return the current resource assignment
+      return currentResourceAssignment;
+    }
+    _rebalanceOverwriteCounter.increment(1L);
+    _rebalanceOverwriteLatency.startMeasuringLatency();
+    LOG.info("Start delayed rebalance overwrites in emergency rebalance.");
+    try {
+      // use the "real" live and enabled instances for calculation
+      ClusterModel clusterModel = ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(
+          clusterData, resourceMap, enabledLiveInstances, currentResourceAssignment);
+      Map<String, ResourceAssignment> assignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);

Review Comment:
   NIT: no further usage of clusterModel. You can merge these two lines.



##########
helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java:
##########
@@ -177,12 +177,8 @@ public static void scheduleOnDemandPipeline(String clusterName, long delay) {
 
   public static void scheduleOnDemandPipeline(String clusterName, long delay,
       boolean shouldRefreshCache) {
-    if (clusterName == null) {
-      LOG.error("Failed to issue a pipeline run. ClusterName is null.");
-      return;
-    }
-    if (delay < 0L) {
-      LOG.error("Failed to issue a pipeline run. Delay is invalid.");
+    if (clusterName == null || delay < 0L) {
+      LOG.warn("ClusterName is null or delay is invalid, skip the pipeline issuing.");

Review Comment:
   It's OK to merge them. But print them out. Otherwise, when debug, it will be hard for us to understand which one cause it.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -311,30 +311,70 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
 
     for (String resourceName : resources) {
-      // <partition, <state, instances set>>
-      Map<String, Map<String, Set<String>>> stateInstanceMap =
-          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
       ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
-      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      IdealState idealState = clusterData.getIdealState(resourceName);
+      String modelDef = idealState.getStateModelDefRef();
       Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      ResourceConfig mergedResourceConfig =
+          ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), idealState);

Review Comment:
   Is this because of user fixed assignment?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -335,6 +335,42 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // <instance, state>
+            // the existing partitions on the enabledLiveInstances are pre-allocated, only process for the rest
+            .filter(e -> !enabledLiveInstances.contains(e.getKey()) ||
+                !newAssignment.containsKey(resourceName) ||
+                !newAssignment.get(resourceName).getReplicaMap(partition).containsKey(e.getKey()))

Review Comment:
   I know the computed result is merging two of them. I would like to for the input of newAssignment, was just containing single replica need to be reassigned.
   
   The reason is that this logic is very complicate. I am thinking of what is the best way to merge them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1171623918


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -335,6 +335,42 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // <instance, state>
+            // the existing partitions on the enabledLiveInstances are pre-allocated, only process for the rest
+            .filter(e -> !enabledLiveInstances.contains(e.getKey()) ||
+                !newAssignment.containsKey(resourceName) ||
+                !newAssignment.get(resourceName).getReplicaMap(partition).containsKey(e.getKey()))

Review Comment:
   I forgot the last PR. When we recompute on the new assignment, was that partition map only contains the single messed up replica or entire reassigned partition.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -432,6 +479,17 @@ protected Map<String, ResourceAssignment> emergencyRebalance(
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+
+    // Step 4: handle delayed rebalance minActiveReplica
+    // Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
+    // and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
+    // once the node comes back of remain offline after the delayed window.
+    Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
+    if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
+      assignmentWithDelayedRebalanceAdjust =
+          handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
+    }

Review Comment:
   We changed the check condition. Previous one means if we have delayed nodes, then we perform the recompute, now it is if delay is enabled then we just compute it. It would be adding more computing time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue merged pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue merged PR #2447:
URL: https://github.com/apache/helix/pull/2447


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on PR #2447:
URL: https://github.com/apache/helix/pull/2447#issuecomment-1521012172

   > Thanks Quincy. I just have one doubt but code changes look good. Is it possible that we couldn't satisfy the minActiveReplica as part of constraint? How do we handle it today?
   
   Hi @desaikomal , the current implementation is best effort, it's possible the end result doesn't satisfy the minActiveReplica. in the new implementation, the calculation may fail, and the previous best possible assignment would be returned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173175092


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -311,30 +311,70 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
 
     for (String resourceName : resources) {
-      // <partition, <state, instances set>>
-      Map<String, Map<String, Set<String>>> stateInstanceMap =
-          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
       ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
-      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      IdealState idealState = clusterData.getIdealState(resourceName);
+      String modelDef = idealState.getStateModelDefRef();
       Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      ResourceConfig mergedResourceConfig =
+          ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), idealState);
+
       // keep all current assignment and add to allocated replicas
       resourceAssignment.getMappedPartitions().forEach(partition ->
           resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
               allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
-                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), mergedResourceConfig,
                       partition.getPartitionName(), state, statePriorityMap.get(state)))));
       // only proceed for resource requiring delayed rebalance overwrites
       List<String> partitions =
           partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
       if (partitions.isEmpty()) {
         continue;
       }
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(resourceAssignment);
       toBeAssignedReplicas.addAll(
           findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
     }
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // <instance, state>
+            // the existing partitions on the enabledLiveInstances are pre-allocated, only process for the rest

Review Comment:
   Why not use reverse way. Since we know map in newAssignment will be the right one, we can just use the ones in newAssignment overrides in currentAssignment. 
   
   In this case, there is no need to check a lot of things.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173896583


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -311,30 +311,70 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
 
     for (String resourceName : resources) {
-      // <partition, <state, instances set>>
-      Map<String, Map<String, Set<String>>> stateInstanceMap =
-          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
       ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
-      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      IdealState idealState = clusterData.getIdealState(resourceName);
+      String modelDef = idealState.getStateModelDefRef();
       Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      ResourceConfig mergedResourceConfig =
+          ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), idealState);
+
       // keep all current assignment and add to allocated replicas
       resourceAssignment.getMappedPartitions().forEach(partition ->
           resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
               allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
-                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), mergedResourceConfig,
                       partition.getPartitionName(), state, statePriorityMap.get(state)))));
       // only proceed for resource requiring delayed rebalance overwrites
       List<String> partitions =
           partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
       if (partitions.isEmpty()) {
         continue;
       }
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(resourceAssignment);
       toBeAssignedReplicas.addAll(
           findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
     }
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // <instance, state>
+            // the existing partitions on the enabledLiveInstances are pre-allocated, only process for the rest

Review Comment:
   Yes, that's a good point to simplify the logic. 
   But one tradeoff is we may be looping through more elements and have more memory footprint. The differences between newAssignment and currentAssignment are the newly brought up replicas on live instances, and those on temporarily downed instances. The whole assignment map might be large, while the diff can be small. 
   Let me know what you think on this. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaimom commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "desaimom (via GitHub)" <gi...@apache.org>.
desaimom commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1176619084


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -432,6 +481,17 @@ protected Map<String, ResourceAssignment> emergencyRebalance(
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+
+    // Step 4: handle delayed rebalance minActiveReplica
+    // Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
+    // and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
+    // once the node comes back of remain offline after the delayed window.
+    Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
+    if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {

Review Comment:
   Got it. I was talking about the method name. But I get your point. Async vs/vs Sync. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173872450


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -335,6 +335,42 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // <instance, state>
+            // the existing partitions on the enabledLiveInstances are pre-allocated, only process for the rest
+            .filter(e -> !enabledLiveInstances.contains(e.getKey()) ||
+                !newAssignment.containsKey(resourceName) ||
+                !newAssignment.get(resourceName).getReplicaMap(partition).containsKey(e.getKey()))

Review Comment:
   I see your point. I thought about similar approach too, unfortunately the current logic in ClusterModel requires we pre-allocate existing partitions, otherwise the new assignment calculations won't honor the constraints. And the current logic is to pass on existing assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173941677


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -335,6 +335,42 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // <instance, state>
+            // the existing partitions on the enabledLiveInstances are pre-allocated, only process for the rest
+            .filter(e -> !enabledLiveInstances.contains(e.getKey()) ||
+                !newAssignment.containsKey(resourceName) ||
+                !newAssignment.get(resourceName).getReplicaMap(partition).containsKey(e.getKey()))

Review Comment:
   Updated a bit, we can filter out just the new assignment and do merge on that only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1171705756


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -335,6 +335,42 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     return toBeAssignedReplicas;
   }
 
+  /**
+   * Merge entries from currentResourceAssignment to newAssignment.
+   * To handle minActiveReplica for delayed rebalance, new assignment is computed based on enabled live instances, but
+   * could miss out current partition allocation still on offline instances (within delayed window).
+   * The merge process is independent for each resource; for each resource-partition, it adds the <instance, state> pair
+   * to newAssignment if it's not there yet; in other word, the entries in newAssignment won't be override.
+   * @param newAssignment newAssignment to merge, this map is getting updated during this method.
+   * @param currentResourceAssignment the current resource assignment
+   * @param enabledLiveInstances the set of enabled live instance
+   */
+  public static void mergeAssignments(Map<String, ResourceAssignment> newAssignment,
+      Map<String, ResourceAssignment> currentResourceAssignment,
+      Set<String> enabledLiveInstances) {
+    // merge with current assignment for partitions assigned on rest of the instances (not immediately live)
+    currentResourceAssignment.entrySet().parallelStream().forEach(entry -> {
+      String resourceName = entry.getKey();
+      ResourceAssignment currentAssignment = entry.getValue();
+      for (Partition partition : currentAssignment.getMappedPartitions()) {
+        currentAssignment.getReplicaMap(partition).entrySet().stream() // <instance, state>
+            // the existing partitions on the enabledLiveInstances are pre-allocated, only process for the rest
+            .filter(e -> !enabledLiveInstances.contains(e.getKey()) ||
+                !newAssignment.containsKey(resourceName) ||
+                !newAssignment.get(resourceName).getReplicaMap(partition).containsKey(e.getKey()))

Review Comment:
   It's a little complicated. The computed assignment contains both existing allocation and the newly computed assignment, BUT, that only apply to the "assignable" nodes, in this case, only the immediate live nodes.
   Some unit tests are using https://github.com/apache/helix/blob/master/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java that doesn't assign current allocation, and it's confusing sometime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1171707000


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java:
##########
@@ -432,6 +479,17 @@ protected Map<String, ResourceAssignment> emergencyRebalance(
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+
+    // Step 4: handle delayed rebalance minActiveReplica
+    // Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
+    // and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
+    // once the node comes back of remain offline after the delayed window.
+    Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
+    if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
+      assignmentWithDelayedRebalanceAdjust =
+          handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
+    }

Review Comment:
   No, the check is still there, inside `handleDelayedRebalanceMinActiveReplica`, if it's not needed, the current assignment is returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2447: WAGED rebalance overwrite redesign -- part 2

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2447:
URL: https://github.com/apache/helix/pull/2447#discussion_r1173812825


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -311,30 +311,70 @@ public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
 
     for (String resourceName : resources) {
-      // <partition, <state, instances set>>
-      Map<String, Map<String, Set<String>>> stateInstanceMap =
-          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
       ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
-      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      IdealState idealState = clusterData.getIdealState(resourceName);
+      String modelDef = idealState.getStateModelDefRef();
       Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      ResourceConfig mergedResourceConfig =
+          ResourceConfig.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), idealState);

Review Comment:
   It seems to be toward backward compatibility. I followed the convention to merge resource config, and it turns out resource config in clusterData might be null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org