You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/07/13 22:51:19 UTC

[helix] branch master updated: Allow getIdealAssignmentForWagedFullAuto return preference list based results, add result filtering (#1136)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d73247  Allow getIdealAssignmentForWagedFullAuto return preference list based results, add result filtering (#1136)
3d73247 is described below

commit 3d732479657230ae59bf0354fb057c023a5b1780
Author: Neal Sun <ne...@gmail.com>
AuthorDate: Mon Jul 13 15:51:11 2020 -0700

    Allow getIdealAssignmentForWagedFullAuto return preference list based results, add result filtering (#1136)
    
    We add an option to allow getIdealAssignmentForWagedFullAuto to return an assignment result that's based on the preference lists, which are closer to be the "calculation result" done by Waged. We also filter the result such that only the provided resources (from IdealStates) will have scheduling results returned.
    
    Co-authored-by: Neal Sun <ne...@nesun-mn1.linkedin.biz>
---
 .../main/java/org/apache/helix/util/HelixUtil.java | 64 ++++++++++++++++++----
 .../WagedRebalancer/TestWagedRebalance.java        | 44 +++++++++++++--
 2 files changed, 90 insertions(+), 18 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index c1d5ad0..9cdfacd 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -164,7 +164,8 @@ public final class HelixUtil {
 
   /**
    * Returns the expected ideal ResourceAssignments for the given resources in the cluster
-   * calculated using the read-only WAGED rebalancer.
+   * calculated using the read-only WAGED rebalancer. The returned result is based on preference
+   * lists, which is the target stable assignment.
    * @param metadataStoreAddress
    * @param clusterConfig
    * @param instanceConfigs
@@ -173,10 +174,45 @@ public final class HelixUtil {
    * @param resourceConfigs
    * @return
    */
-  public static Map<String, ResourceAssignment> getIdealAssignmentForWagedFullAuto(
+  public static Map<String, ResourceAssignment> getTargetAssignmentForWagedFullAuto(
       String metadataStoreAddress, ClusterConfig clusterConfig,
       List<InstanceConfig> instanceConfigs, List<String> liveInstances,
       List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+    return getAssignmentForWagedFullAutoImpl(metadataStoreAddress, clusterConfig,
+        instanceConfigs, liveInstances, idealStates, resourceConfigs, true);
+  }
+
+  /**
+   * Returns the expected ideal ResourceAssignments for the given resources in the cluster
+   * calculated using the read-only WAGED rebalancer. The returned result is based on partition
+   * state mapping. which is the immediate assignment. The immediate assignment is different from
+   * the final target assignment; it could be an intermediate state where it contains replicas that
+   * need to be dropped later, for example.
+   * @param metadataStoreAddress
+   * @param clusterConfig
+   * @param instanceConfigs
+   * @param liveInstances
+   * @param idealStates
+   * @param resourceConfigs
+   * @return
+   */
+  public static Map<String, ResourceAssignment> getImmediateAssignmentForWagedFullAuto(
+      String metadataStoreAddress, ClusterConfig clusterConfig,
+      List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+      List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+    return getAssignmentForWagedFullAutoImpl(metadataStoreAddress, clusterConfig,
+        instanceConfigs, liveInstances, idealStates, resourceConfigs, false);
+  }
+
+  /*
+   * If usePrefLists is set to true, the returned assignment is based on preference lists; if
+   * false, the returned assignment is based on partition state mapping, which may differ from
+   * preference lists.
+   */
+  private static Map<String, ResourceAssignment> getAssignmentForWagedFullAutoImpl(
+      String metadataStoreAddress, ClusterConfig clusterConfig,
+      List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+      List<IdealState> idealStates, List<ResourceConfig> resourceConfigs, boolean usePrefLists) {
     // Copy the cluster config and make globalRebalance happen synchronously
     // Otherwise, globalRebalance may not complete and this util might end up returning
     // an empty assignment.
@@ -254,18 +290,22 @@ public final class HelixUtil {
       throw new HelixException(
           "getIdealAssignmentForWagedFullAuto(): Calculation failed: Failed to compute BestPossibleState!");
     }
-    Map<String, Resource> resourceMap =
-        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
-    if (resourceMap == null) {
-      throw new HelixException(
-          "getIdealAssignmentForWagedFullAuto(): Calculation failed: RESOURCES_TO_REBALANCE is null!");
-    }
-    for (Resource resource : resourceMap.values()) {
-      String resourceName = resource.getResourceName();
+    for (IdealState idealState : idealStates) {
+      String resourceName = idealState.getResourceName();
+      StateModelDefinition stateModelDefinition =
+          BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+              .getStateModelDefinition();
       PartitionStateMap partitionStateMap = output.getPartitionStateMap(resourceName);
       ResourceAssignment resourceAssignment = new ResourceAssignment(resourceName);
-      for (Partition partition : resource.getPartitions()) {
-        resourceAssignment.addReplicaMap(partition, partitionStateMap.getPartitionMap(partition));
+      for (String partitionName : idealState.getPartitionSet()) {
+        Partition partition = new Partition(partitionName);
+        if (usePrefLists) {
+          resourceAssignment.addReplicaMap(partition, computeIdealMapping(
+              output.getPreferenceList(resourceName, partitionName),
+              stateModelDefinition, new HashSet<>(liveInstances)));
+        } else {
+          resourceAssignment.addReplicaMap(partition, partitionStateMap.getPartitionMap(partition));
+        }
       }
       result.put(resourceName, resourceAssignment);
     }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index e700524..173b0cb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -26,8 +26,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
@@ -47,6 +45,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
@@ -186,13 +185,34 @@ public class TestWagedRebalance extends ZkTestBase {
 
     // Verify that utilResult contains the assignment for the resources added
     Map<String, ResourceAssignment> utilResult = HelixUtil
-        .getIdealAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs, liveInstances,
+        .getTargetAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs, liveInstances,
             idealStates, resourceConfigs);
     Assert.assertNotNull(utilResult);
-    Assert.assertEquals(utilResult.size(), _allDBs.size());
+    Assert.assertEquals(utilResult.size(), idealStates.size());
     for (IdealState idealState : idealStates) {
       Assert.assertTrue(utilResult.containsKey(idealState.getResourceName()));
-      Assert.assertEquals(utilResult.get(idealState.getResourceName()).getRecord().getMapFields(),
+      StateModelDefinition stateModelDefinition =
+          BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
+              .getStateModelDefinition();
+      for (String partition : idealState.getPartitionSet()) {
+        Assert.assertEquals(
+            utilResult.get(idealState.getResourceName()).getRecord().getMapField(partition),
+            HelixUtil
+                .computeIdealMapping(idealState.getPreferenceList(partition), stateModelDefinition,
+                    new HashSet<>(liveInstances)));
+      }
+    }
+
+    // Verify that the partition state mapping mode also works
+    Map<String, ResourceAssignment> paritionMappingBasedResult = HelixUtil
+        .getImmediateAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs,
+            liveInstances, idealStates, resourceConfigs);
+    Assert.assertNotNull(paritionMappingBasedResult);
+    Assert.assertEquals(paritionMappingBasedResult.size(), idealStates.size());
+    for (IdealState idealState : idealStates) {
+      Assert.assertTrue(paritionMappingBasedResult.containsKey(idealState.getResourceName()));
+      Assert.assertEquals(
+          paritionMappingBasedResult.get(idealState.getResourceName()).getRecord().getMapFields(),
           idealState.getRecord().getMapFields());
     }
 
@@ -209,7 +229,7 @@ public class TestWagedRebalance extends ZkTestBase {
     }
 
     utilResult = HelixUtil
-        .getIdealAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs, liveInstances,
+        .getTargetAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs, liveInstances,
             idealStates, resourceConfigs);
 
     Set<String> instancesWithAssignments = new HashSet<>();
@@ -219,6 +239,18 @@ public class TestWagedRebalance extends ZkTestBase {
     // The newly added instances should contain some partitions
     Assert.assertTrue(instancesWithAssignments.contains(instance_0));
     Assert.assertTrue(instancesWithAssignments.contains(instance_1));
+
+    // Perform the same test with immediate assignment
+    utilResult = HelixUtil
+        .getImmediateAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs,
+            liveInstances, idealStates, resourceConfigs);
+    Set<String> instancesWithAssignmentsImmediate = new HashSet<>();
+    utilResult.values().forEach(
+        resourceAssignment -> resourceAssignment.getRecord().getMapFields().values()
+            .forEach(entry -> instancesWithAssignmentsImmediate.addAll(entry.keySet())));
+    // The newly added instances should contain some partitions
+    Assert.assertTrue(instancesWithAssignmentsImmediate.contains(instance_0));
+    Assert.assertTrue(instancesWithAssignmentsImmediate.contains(instance_1));
   }
 
   @Test(dependsOnMethods = "test")