You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by "qqu0127 (via GitHub)" <gi...@apache.org> on 2023/04/11 16:00:20 UTC

[GitHub] [helix] qqu0127 opened a new pull request, #2444: WAGED rebalance overwrite redesign -- part 1

qqu0127 opened a new pull request, #2444:
URL: https://github.com/apache/helix/pull/2444

   ### Issues
   
   - [X] My PR addresses the following Helix issues and references them in the PR description:
   https://github.com/apache/helix/issues/2441
   
   ### Description
   
   - [X] Here are some details about my PR, including screenshots of any UI changes:
   
   As the first PR to address the hard constraint issue during WAGED rebalance overwrites.
   The general idea is to use WAGED constraint based algorithm to handle "rebalance overwrites" instead of applying blindly as a afterthought.
   
   This PR proposed a new rebalance scope and its cluster model and a few util methods. This is NOT integrated yet with any current flow, so no behavior change with this PR.
   
   The new rebalance scope:
   1. Assignable replicas are those partitions who miss minActiveReplicas, we need to bring up additional replicas for those.
   2. The additional replicas count is determined by current `numReplica`, `minActiveReplica`, and the corresponding state is by state model definition (state count map)
   3. Pre-allocate all current assignment on active nodes, so no existing assignment is changed
   
   Entry point for the logic is in `ClusterModelProvider::generateClusterModelForDelayedRebalanceOverwrites`, major new logic introduced in `DelayedRebalanceUtil.java`.
   
   ### Tests
   
   - [X] The following tests are written for this issue:
   helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
   
   - The following is the result of the "mvn test" command on the appropriate module:
   
   (If CI test fails due to known issue, please specify the issue and test PR locally. Then copy & paste the result of "mvn test" to here.)
   
   ### Changes that Break Backward Compatibility (Optional)
   
   - My PR contains changes that break backward compatibility or previous assumptions for certain methods or API. They include:
   
   (Consider including all behavior changes for public methods or API. Also include these changes in merge description so that other developers are aware of these changes. This allows them to make relevant code changes in feature branches accounting for the new method/API behavior.)
   
   ### Documentation (Optional)
   
   - In case of new functionality, my PR adds documentation in the following wiki page:
   
   (Link the GitHub wiki you added)
   
   ### Commits
   
   - My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Code Quality
   
   - My diff has been formatted using helix-style.xml 
   (helix-style-intellij.xml if IntelliJ IDE is used)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166152369


##########
helix-core/src/main/java/org/apache/helix/model/PartitionWithReplicaCount.java:
##########
@@ -0,0 +1,69 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Objects;
+
+
+/**
+ * A POJO class containing {@link Partition} with missing replicas.
+ */
+public class PartitionWithReplicaCount {

Review Comment:
   Ok, will change. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166017607


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<PartitionWithReplicaCount> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<PartitionWithReplicaCount>> getPartitionsNeedForRebalanceOverwrites(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> findPartitionsMissingMinActiveReplica(clusterData, entry.getValue())
+        ));
+  }
+
+  private static List<PartitionWithReplicaCount> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()
+        .parallelStream()
+        .filter(partition -> {
+          long enabledLivePlacementCounter = resourceAssignment.getReplicaMap(partition).keySet()
+              .stream()
+              .filter(enabledLiveInstances::contains)
+              .count();
+          return enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica);
+        })
+        .map(partition -> new PartitionWithReplicaCount(partition, minActiveReplica, numReplica))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica.
+   * @param clusterData Cluster data cache.
+   * @param resourceName name of the resource
+   * @param partitions Pre-computed list of partition missing minActiveReplica
+   * @param stateInstanceMap <partition, <state, instances set>>
+   * @param liveEnabledInstances A set of live and enabled instances
+   * @return A set of AssignableReplica
+   */
+  private static Set<AssignableReplica> findAssignableReplicaForResource(
+      ResourceControllerDataProvider clusterData,
+      String resourceName,
+      List<PartitionWithReplicaCount> partitions,
+      Map<String, Map<String, Set<String>>> stateInstanceMap,
+      Set<String> liveEnabledInstances) {
+    LOG.info("Computing replicas requiring rebalance overwrite for resource: {}", resourceName);
+    final List<String> priorityOrderedStates = getPriorityOrderedStates(resourceName, clusterData);
+    final IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    final ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
+    final Map<String, Integer> statePriorityMap =
+        clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+    final Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (PartitionWithReplicaCount partitionWithReplicaCount : partitions) {
+      String partitionName = partitionWithReplicaCount.getPartition().getPartitionName();
+      // count current active replicas of the partition
+      Map<String, Integer> activeStateReplicaCount = stateInstanceMap.getOrDefault(partitionName, Collections.emptyMap())
+          .entrySet()
+          .stream()
+          .collect(Collectors.toMap(Map.Entry::getKey,
+              e -> (int) e.getValue().stream().filter(liveEnabledInstances::contains).count()));
+      int activeReplicas = activeStateReplicaCount.values().stream().reduce(Integer::sum).orElse(0);
+      int minActiveReplica = partitionWithReplicaCount.getMinActiveReplica();
+      int replicaGapCount = minActiveReplica - activeReplicas;
+      if (replicaGapCount <= 0) {
+        // delayed rebalance overwrites isn't required, early stop and move on to next partition
+        continue;
+      }
+      // follow the state priority order, add additional replicas to close the gap on replica count
+      Map<String, Integer> stateCountMap = clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
+          .getStateCountMap(minActiveReplica, minActiveReplica);
+      // follow the priority order of states and prepare additional replicas to be assigned
+      for (String state : priorityOrderedStates) {
+        if (replicaGapCount <= 0) {
+          break;
+        }
+        int priority = statePriorityMap.get(state);
+        int curActiveStateCount = activeStateReplicaCount.getOrDefault(state, 0);
+        for (int i = 0; i < stateCountMap.get(state) - curActiveStateCount && replicaGapCount > 0; i++) {
+          toBeAssignedReplicas.add(
+              new AssignableReplica(clusterData.getClusterConfig(), resourceConfig, partitionName, state, priority));
+          replicaGapCount--;
+        }
+      }
+    }
+    LOG.info("Replicas: {} need to be brought up for rebalance overwrite.", toBeAssignedReplicas);
+    return toBeAssignedReplicas;
+  }
+
+  private static List<String> getPriorityOrderedStates(String resourceName, ResourceControllerDataProvider clusterData) {
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Map<String, Integer> statePriorityMap =
+        clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+
+    List<String> priorityOrderedStates = new ArrayList<>(statePriorityMap.keySet());
+    priorityOrderedStates.sort(Comparator.comparing(a -> statePriorityMap.getOrDefault(a, Integer.MAX_VALUE)));
+    return priorityOrderedStates;

Review Comment:
   Thanks a lot. Will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on PR #2444:
URL: https://github.com/apache/helix/pull/2444#issuecomment-1503837311

   > My understand the order is: 1) we compute the IdealMapping with WAGED then 2) DelayAutoRebalance and 3) this PR bring override min active replicas back. This is more like we did computation and then patch it again...
   > 
   > My suggestion is to have a pass in for N -> N + 1 with generic restrictions. For default delay, nothing passin. For WAGED, we can pass in hard constraint to limit the adjustment.
   
   Synced offline. This is not N -> N + 1 caused problem. It is DelayRebalance introduced problem. Concern resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] xyuanlu merged pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "xyuanlu (via GitHub)" <gi...@apache.org>.
xyuanlu merged PR #2444:
URL: https://github.com/apache/helix/pull/2444


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "desaikomal (via GitHub)" <gi...@apache.org>.
desaikomal commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1175628816


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +286,168 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * NOTE: This method also populates allocatedReplicas as it goes through all resources to preserve current allocation.
+   *
+   * @param clusterData Cluster data cache.
+   * @param resources A set all resource names.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Set<String> resources,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : resources) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);

Review Comment:
   nit: just noticed, you could pass that as argument in line 315.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166082764


##########
helix-core/src/main/java/org/apache/helix/model/PartitionWithReplicaCount.java:
##########
@@ -0,0 +1,69 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Objects;
+
+
+/**
+ * A POJO class containing {@link Partition} with missing replicas.
+ */
+public class PartitionWithReplicaCount {

Review Comment:
   yeah I can replace the Partition with string name only. 
   
   I created this class to reduce some duplication and overhead especially on computing minActiveReplica, it's cleaner to bundle things together.
   Regarding GC, it's non-static, it's not holding too many things (only partitions that failed minActiveReplica), and it's only held within the method invocation of `findToBeAssignedReplicasForMinActiveReplica`. I think we can give GC a chance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1167188942


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,157 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {

Review Comment:
   As the comment `keep all current assignment and add to allocated replicas`, we should be populating `allocatedReplicas` to preserve current assignment, and this requires going through all resources -- not just the resource-partition missing minActiveReplica
   
   Let me add more comments to clarify, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1167228513


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,157 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<String> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<String>> findPartitionsMissingMinActiveReplica(

Review Comment:
   yes, just added some comments, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on PR #2444:
URL: https://github.com/apache/helix/pull/2444#issuecomment-1503705609

   My understand the order is: 1) we compute the IdealMapping with WAGED then 2) DelayAutoRebalance and 3) this PR bring override min active replicas back. This is more like we did computation and then patch it again...
   
   My suggestion is to have a pass in for N -> N + 1 with generic restrictions. For default delay, nothing passin. For WAGED, we can pass in hard constraint to limit the adjustment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166056869


##########
helix-core/src/main/java/org/apache/helix/model/PartitionWithReplicaCount.java:
##########
@@ -0,0 +1,69 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Objects;
+
+
+/**
+ * A POJO class containing {@link Partition} with missing replicas.
+ */
+public class PartitionWithReplicaCount {

Review Comment:
   NIT: Why we need this object? They are nested partitions in the List object. The only need is get replica count. We can store list Partition or just partition name as a String. When we compute, we can get MinActiveReplica easily, right.  Also equals is not necessary here, right?
   
   Here I am a little bit worrying about the too many objects can cause GCs for each of pipeline as these objects will not be hold in memory and continuously update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1168778389


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +286,168 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * NOTE: This method also populates allocatedReplicas as it goes through all resources to preserve current allocation.
+   *
+   * @param clusterData Cluster data cache.
+   * @param resources A set all resource names.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.

Review Comment:
   In most cases we start with an empty map and this method adds elements to it, so I'd prefer the word populated over updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "desaikomal (via GitHub)" <gi...@apache.org>.
desaikomal commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166110025


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites

Review Comment:
   does this mean that these partitions are still in delayed-window? Or they are marked permanently down?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,157 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {

Review Comment:
   note: Since we are using partitionsMissingMinActiveReplicas, why are we iterating again?



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<PartitionWithReplicaCount> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<PartitionWithReplicaCount>> getPartitionsNeedForRebalanceOverwrites(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> findPartitionsMissingMinActiveReplica(clusterData, entry.getValue())
+        ));
+  }
+
+  private static List<PartitionWithReplicaCount> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()
+        .parallelStream()
+        .filter(partition -> {
+          long enabledLivePlacementCounter = resourceAssignment.getReplicaMap(partition).keySet()
+              .stream()
+              .filter(enabledLiveInstances::contains)
+              .count();
+          return enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica);
+        })
+        .map(partition -> new PartitionWithReplicaCount(partition, minActiveReplica, numReplica))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica.
+   * @param clusterData Cluster data cache.
+   * @param resourceName name of the resource
+   * @param partitions Pre-computed list of partition missing minActiveReplica
+   * @param stateInstanceMap <partition, <state, instances set>>
+   * @param liveEnabledInstances A set of live and enabled instances
+   * @return A set of AssignableReplica
+   */
+  private static Set<AssignableReplica> findAssignableReplicaForResource(

Review Comment:
   how is this function different from: findToBeAssignedReplicasForMinActiveReplica



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,157 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<String>> partitionsMissingMinActiveReplicas =
+        findPartitionsMissingMinActiveReplica(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<String> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<String>> findPartitionsMissingMinActiveReplica(

Review Comment:
   nit: can you please add comment ??



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<PartitionWithReplicaCount> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<PartitionWithReplicaCount>> getPartitionsNeedForRebalanceOverwrites(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> findPartitionsMissingMinActiveReplica(clusterData, entry.getValue())
+        ));
+  }
+
+  private static List<PartitionWithReplicaCount> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()

Review Comment:
   Note to myself: i need to understand this lambda processing better... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1167076357


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<PartitionWithReplicaCount> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<PartitionWithReplicaCount>> getPartitionsNeedForRebalanceOverwrites(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> findPartitionsMissingMinActiveReplica(clusterData, entry.getValue())
+        ));
+  }
+
+  private static List<PartitionWithReplicaCount> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()
+        .parallelStream()
+        .filter(partition -> {
+          long enabledLivePlacementCounter = resourceAssignment.getReplicaMap(partition).keySet()
+              .stream()
+              .filter(enabledLiveInstances::contains)
+              .count();
+          return enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica);
+        })
+        .map(partition -> new PartitionWithReplicaCount(partition, minActiveReplica, numReplica))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica.
+   * @param clusterData Cluster data cache.
+   * @param resourceName name of the resource
+   * @param partitions Pre-computed list of partition missing minActiveReplica
+   * @param stateInstanceMap <partition, <state, instances set>>
+   * @param liveEnabledInstances A set of live and enabled instances
+   * @return A set of AssignableReplica
+   */
+  private static Set<AssignableReplica> findAssignableReplicaForResource(

Review Comment:
   line 383 is for a single resource, while `findToBeAssignedReplicasForMinActiveReplica` is for all resources, that method does more than that too (populating map allocatedReplicas), will add more comments there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1165995219


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java:
##########
@@ -53,7 +54,31 @@ private enum RebalanceScopeType {
     // changes.
     GLOBAL_BASELINE,
     // Set the rebalance scope to cover only replicas that are assigned to downed instances.
-    EMERGENCY
+    EMERGENCY,
+    // A temporary overwrites for partition replicas on downed instance but still within the delayed window but missing
+    // minActiveReplicas
+    DELAYED_REBALANCE_OVERWRITES
+  }
+
+  /**
+   * TODO: On integration with WAGED, have to integrate with counter and latency metrics -- qqu
+   * Compute a new Cluster Model with scope limited to partitions with best possible assignment missing minActiveReplicas
+   * because of delayed rebalance setting.
+   * @param dataProvider The controller's data cache
+   * @param resourceMap The full map of the resource by name
+   * @param activeInstances The active instances that will be used in the calculation.
+   * @param resourceAssignment The resource assignment state to compute on. This should be the current state assignment;
+   *                           if it's run right after another rebalance calculation, the best possible assignment from
+   *                           previous result can be used.
+   * @return the ClusterModel
+   */
+  public static ClusterModel generateClusterModelForDelayedRebalanceOverwrites(

Review Comment:
   I guess the hooking of the call will be part 2, right?
   
   But logically, I am not sure whether this is the right place, the hierarchy is:
   
   rebalance -> Baseline
                     -> Partial
                     -> Emergency
                     
                     
   But delay caused overrides sounds more part of partial rebalance. If we have a separate entry here could be wired.



##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites
+      List<PartitionWithReplicaCount> partitions =
+          partitionsMissingMinActiveReplicas.getOrDefault(resourceName, Collections.emptyList());
+      if (partitions.isEmpty()) {
+        continue;
+      }
+      toBeAssignedReplicas.addAll(
+          findAssignableReplicaForResource(clusterData, resourceName, partitions, stateInstanceMap, liveEnabledInstances));
+    }
+    return toBeAssignedReplicas;
+  }
+
+  private static Map<String, List<PartitionWithReplicaCount>> getPartitionsNeedForRebalanceOverwrites(
+      ResourceControllerDataProvider clusterData,
+      Map<String, ResourceAssignment> currentAssignment) {
+    return currentAssignment.entrySet()
+        .parallelStream()
+        .collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> findPartitionsMissingMinActiveReplica(clusterData, entry.getValue())
+        ));
+  }
+
+  private static List<PartitionWithReplicaCount> findPartitionsMissingMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      ResourceAssignment resourceAssignment) {
+    String resourceName = resourceAssignment.getResourceName();
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+    int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+        .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+            currentIdealState), currentIdealState, numReplica);
+    return resourceAssignment.getMappedPartitions()
+        .parallelStream()
+        .filter(partition -> {
+          long enabledLivePlacementCounter = resourceAssignment.getReplicaMap(partition).keySet()
+              .stream()
+              .filter(enabledLiveInstances::contains)
+              .count();
+          return enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica);
+        })
+        .map(partition -> new PartitionWithReplicaCount(partition, minActiveReplica, numReplica))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica.
+   * @param clusterData Cluster data cache.
+   * @param resourceName name of the resource
+   * @param partitions Pre-computed list of partition missing minActiveReplica
+   * @param stateInstanceMap <partition, <state, instances set>>
+   * @param liveEnabledInstances A set of live and enabled instances
+   * @return A set of AssignableReplica
+   */
+  private static Set<AssignableReplica> findAssignableReplicaForResource(
+      ResourceControllerDataProvider clusterData,
+      String resourceName,
+      List<PartitionWithReplicaCount> partitions,
+      Map<String, Map<String, Set<String>>> stateInstanceMap,
+      Set<String> liveEnabledInstances) {
+    LOG.info("Computing replicas requiring rebalance overwrite for resource: {}", resourceName);
+    final List<String> priorityOrderedStates = getPriorityOrderedStates(resourceName, clusterData);
+    final IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    final ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
+    final Map<String, Integer> statePriorityMap =
+        clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+    final Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (PartitionWithReplicaCount partitionWithReplicaCount : partitions) {
+      String partitionName = partitionWithReplicaCount.getPartition().getPartitionName();
+      // count current active replicas of the partition
+      Map<String, Integer> activeStateReplicaCount = stateInstanceMap.getOrDefault(partitionName, Collections.emptyMap())
+          .entrySet()
+          .stream()
+          .collect(Collectors.toMap(Map.Entry::getKey,
+              e -> (int) e.getValue().stream().filter(liveEnabledInstances::contains).count()));
+      int activeReplicas = activeStateReplicaCount.values().stream().reduce(Integer::sum).orElse(0);
+      int minActiveReplica = partitionWithReplicaCount.getMinActiveReplica();
+      int replicaGapCount = minActiveReplica - activeReplicas;
+      if (replicaGapCount <= 0) {
+        // delayed rebalance overwrites isn't required, early stop and move on to next partition
+        continue;
+      }
+      // follow the state priority order, add additional replicas to close the gap on replica count
+      Map<String, Integer> stateCountMap = clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
+          .getStateCountMap(minActiveReplica, minActiveReplica);
+      // follow the priority order of states and prepare additional replicas to be assigned
+      for (String state : priorityOrderedStates) {
+        if (replicaGapCount <= 0) {
+          break;
+        }
+        int priority = statePriorityMap.get(state);
+        int curActiveStateCount = activeStateReplicaCount.getOrDefault(state, 0);
+        for (int i = 0; i < stateCountMap.get(state) - curActiveStateCount && replicaGapCount > 0; i++) {
+          toBeAssignedReplicas.add(
+              new AssignableReplica(clusterData.getClusterConfig(), resourceConfig, partitionName, state, priority));
+          replicaGapCount--;
+        }
+      }
+    }
+    LOG.info("Replicas: {} need to be brought up for rebalance overwrite.", toBeAssignedReplicas);
+    return toBeAssignedReplicas;
+  }
+
+  private static List<String> getPriorityOrderedStates(String resourceName, ResourceControllerDataProvider clusterData) {
+    IdealState currentIdealState = clusterData.getIdealState(resourceName);
+    Map<String, Integer> statePriorityMap =
+        clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
+
+    List<String> priorityOrderedStates = new ArrayList<>(statePriorityMap.keySet());
+    priorityOrderedStates.sort(Comparator.comparing(a -> statePriorityMap.getOrDefault(a, Integer.MAX_VALUE)));
+    return priorityOrderedStates;

Review Comment:
     public List<String> getStatesPriorityList();
   
   We have this API in state model definition. You dont need to write this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166014847


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java:
##########
@@ -53,7 +54,31 @@ private enum RebalanceScopeType {
     // changes.
     GLOBAL_BASELINE,
     // Set the rebalance scope to cover only replicas that are assigned to downed instances.
-    EMERGENCY
+    EMERGENCY,
+    // A temporary overwrites for partition replicas on downed instance but still within the delayed window but missing
+    // minActiveReplicas
+    DELAYED_REBALANCE_OVERWRITES
+  }
+
+  /**
+   * TODO: On integration with WAGED, have to integrate with counter and latency metrics -- qqu
+   * Compute a new Cluster Model with scope limited to partitions with best possible assignment missing minActiveReplicas
+   * because of delayed rebalance setting.
+   * @param dataProvider The controller's data cache
+   * @param resourceMap The full map of the resource by name
+   * @param activeInstances The active instances that will be used in the calculation.
+   * @param resourceAssignment The resource assignment state to compute on. This should be the current state assignment;
+   *                           if it's run right after another rebalance calculation, the best possible assignment from
+   *                           previous result can be used.
+   * @return the ClusterModel
+   */
+  public static ClusterModel generateClusterModelForDelayedRebalanceOverwrites(

Review Comment:
   First, yes the integration will be in part 2
   
   I see what you mean. The way I see it, there is another layer of mapping between the rebalance process and clusterModel. So the clusterModel technically can be used by multiple flows. So I think it's OK to have separate entry for it.
   Also, it will not calculated with partial rebalance, this has to happen in emergency, as this is urgent and has to be in main thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] junkaixue commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "junkaixue (via GitHub)" <gi...@apache.org>.
junkaixue commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1166090507


##########
helix-core/src/main/java/org/apache/helix/model/PartitionWithReplicaCount.java:
##########
@@ -0,0 +1,69 @@
+package org.apache.helix.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Objects;
+
+
+/**
+ * A POJO class containing {@link Partition} with missing replicas.
+ */
+public class PartitionWithReplicaCount {

Review Comment:
   I got your point but still since we do not need to pass the object around. Thus create a class just for small step of intermediate computations would be a little bit over. Highly recommend not have this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on PR #2444:
URL: https://github.com/apache/helix/pull/2444#issuecomment-1511491592

   This PR is ready to merge, approved by @junkaixue and @desaikomal 
   Commit message:
   Implement new code path to handle "rebalance overwrites" that honors WAGED hard constraints 
   The rebalance overwrite is a special mechanism to handle minActiveReplica during delayed rebalance. Additional replicas might be brought up in an instance that violates the WAGED hard constraint. This commit implements new code logic for it that honors the hard constraints. The new code is NOT yet integrated to existing flow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] qqu0127 commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "qqu0127 (via GitHub)" <gi...@apache.org>.
qqu0127 commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1167187498


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +287,156 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * @param clusterData Cluster data cache.
+   * @param replicaMap A set of assignable replicas by resource name.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.
+   * @return The replicas that need to be assigned.
+   */
+  public static Set<AssignableReplica> findToBeAssignedReplicasForMinActiveReplica(
+      ResourceControllerDataProvider clusterData,
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Set<String> liveEnabledInstances,
+      Map<String, ResourceAssignment> currentAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Map<String, List<PartitionWithReplicaCount>> partitionsMissingMinActiveReplicas =
+        getPartitionsNeedForRebalanceOverwrites(clusterData, currentAssignment);
+    if (partitionsMissingMinActiveReplicas.isEmpty()) {
+      return Collections.emptySet();
+    }
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    for (String resourceName : replicaMap.keySet()) {
+      // <partition, <state, instances set>>
+      Map<String, Map<String, Set<String>>> stateInstanceMap =
+          ClusterModelProvider.getStateInstanceMap(currentAssignment.get(resourceName));
+      ResourceAssignment resourceAssignment = currentAssignment.get(resourceName);
+      String modelDef = clusterData.getIdealState(resourceName).getStateModelDefRef();
+      Map<String, Integer> statePriorityMap = clusterData.getStateModelDef(modelDef).getStatePriorityMap();
+      // keep all current assignment and add to allocated replicas
+      resourceAssignment.getMappedPartitions().forEach(partition ->
+          resourceAssignment.getReplicaMap(partition).forEach((instance, state) ->
+              allocatedReplicas.computeIfAbsent(instance, key -> new HashSet<>())
+                  .add(new AssignableReplica(clusterData.getClusterConfig(), clusterData.getResourceConfig(resourceName),
+                      partition.getPartitionName(), state, statePriorityMap.get(state)))));
+      // only proceed for resource requiring delayed rebalance overwrites

Review Comment:
   Still in delayed window. Permanently downed instances are processed separately with the "emergency scope"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org


[GitHub] [helix] desaikomal commented on a diff in pull request #2444: WAGED rebalance overwrite redesign -- part 1

Posted by "desaikomal (via GitHub)" <gi...@apache.org>.
desaikomal commented on code in PR #2444:
URL: https://github.com/apache/helix/pull/2444#discussion_r1167245946


##########
helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java:
##########
@@ -279,4 +286,168 @@ public static void setRebalanceScheduler(String resourceName, boolean isDelayedR
       }
     }
   }
+
+  /**
+   * Computes the partition replicas that needs to be brought up to satisfy minActiveReplicas while downed instances
+   * are within the delayed window.
+   * Keep all current assignment with their current allocation.
+   * NOTE: This method also populates allocatedReplicas as it goes through all resources to preserve current allocation.
+   *
+   * @param clusterData Cluster data cache.
+   * @param resources A set all resource names.
+   * @param liveEnabledInstances The set of live and enabled instances.
+   * @param currentAssignment Current assignment by resource name.
+   * @param allocatedReplicas The map from instance name to assigned replicas, the map is populated in this method.

Review Comment:
   is it updated or populated ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org