You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/06/10 22:17:26 UTC

[helix] branch master updated: Fix ReadOnlyWagedRebalancer so that it computes mapping from scratch (#1058)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new cadc17f  Fix ReadOnlyWagedRebalancer so that it computes mapping from scratch (#1058)
cadc17f is described below

commit cadc17f519a7b1cd8984e1553c98e6a022ffda7c
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Jun 10 15:17:16 2020 -0700

    Fix ReadOnlyWagedRebalancer so that it computes mapping from scratch (#1058)
    
    Previously, ReadOnlyWagedRebalancer would only read from the previously computed best possible mapping and returns it. This commit changes it so that it computes things from scratch - it can read the previously computed best possible mapping but shouldn't just return it without doing any calculation.
---
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  |  9 ----
 .../BestPossibleExternalViewVerifier.java          | 30 ++++++++++---
 .../main/java/org/apache/helix/util/HelixUtil.java | 52 ++++++++++++++++++----
 .../WagedRebalancer/TestWagedRebalance.java        | 26 +++++++++++
 4 files changed, 95 insertions(+), 22 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index eccb175..d9816c2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -48,15 +48,6 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
         ConstraintBasedAlgorithmFactory.getInstance(preferences), Optional.empty());
   }
 
-  @Override
-  protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
-      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
-      Set<String> activeNodes, CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
-      throws HelixRebalanceException {
-    return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
-        resourceMap.keySet());
-  }
-
   private static class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
     ReadOnlyAssignmentMetadataStore(String metadataStoreAddress, String clusterName) {
       super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName);
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6fc833b..7ea9a05 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -30,21 +30,25 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.RebalanceUtil;
@@ -408,14 +412,14 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
 
     RebalanceUtil.runStage(event, new CurrentStateComputationStage());
     // Note the readOnlyWagedRebalancer is just for one time usage
-    ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
-        new ReadOnlyWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
+    DryrunWagedRebalancer dryrunWagedRebalancer =
+        new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
             cache.getClusterConfig().getGlobalRebalancePreference());
-    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), readOnlyWagedRebalancer);
+    event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
     try {
       RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
     } finally {
-      readOnlyWagedRebalancer.close();
+      dryrunWagedRebalancer.close();
     }
 
     BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
@@ -428,4 +432,20 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources["
        + (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
   }
+
+  private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
+    public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
+        Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+      super(metadataStoreAddress, clusterName, preferences);
+    }
+
+    @Override
+    protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+        ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+        Set<String> activeNodes, CurrentStateOutput currentStateOutput,
+        RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+      return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
+          resourceMap.keySet());
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 348ce07..c1d5ad0 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -28,12 +28,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Joiner;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -175,31 +177,57 @@ public final class HelixUtil {
       String metadataStoreAddress, ClusterConfig clusterConfig,
       List<InstanceConfig> instanceConfigs, List<String> liveInstances,
       List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+    // Copy the cluster config and make globalRebalance happen synchronously
+    // Otherwise, globalRebalance may not complete and this util might end up returning
+    // an empty assignment.
+    ClusterConfig globalSyncClusterConfig = new ClusterConfig(clusterConfig.getRecord());
+    globalSyncClusterConfig.setGlobalRebalanceAsyncMode(false);
+
     // Prepare a data accessor for a dataProvider (cache) refresh
     BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
     HelixDataAccessor helixDataAccessor =
-        new ZKHelixDataAccessor(clusterConfig.getClusterName(), baseDataAccessor);
+        new ZKHelixDataAccessor(globalSyncClusterConfig.getClusterName(), baseDataAccessor);
 
     // Create an instance of read-only WAGED rebalancer
     ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
-        new ReadOnlyWagedRebalancer(metadataStoreAddress, clusterConfig.getClusterName(),
-            clusterConfig.getGlobalRebalancePreference());
+        new ReadOnlyWagedRebalancer(metadataStoreAddress, globalSyncClusterConfig.getClusterName(),
+            globalSyncClusterConfig.getGlobalRebalancePreference());
 
     // Use a dummy event to run the required stages for BestPossibleState calculation
     // Attributes RESOURCES and RESOURCES_TO_REBALANCE are populated in ResourceComputationStage
-    ClusterEvent event = new ClusterEvent(clusterConfig.getClusterName(), ClusterEventType.Unknown);
+    ClusterEvent event =
+        new ClusterEvent(globalSyncClusterConfig.getClusterName(), ClusterEventType.Unknown);
 
     try {
       // Obtain a refreshed dataProvider (cache) and overwrite cluster parameters with the given parameters
       ResourceControllerDataProvider dataProvider =
-          new ResourceControllerDataProvider(clusterConfig.getClusterName());
+          new ResourceControllerDataProvider(globalSyncClusterConfig.getClusterName());
       dataProvider.requireFullRefresh();
       dataProvider.refresh(helixDataAccessor);
-      dataProvider.setClusterConfig(clusterConfig);
+      dataProvider.setClusterConfig(globalSyncClusterConfig);
       dataProvider.setInstanceConfigMap(instanceConfigs.stream()
           .collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())));
-      dataProvider.setLiveInstances(
-          liveInstances.stream().map(LiveInstance::new).collect(Collectors.toList()));
+      // For LiveInstances, we must preserve the existing session IDs
+      // So read LiveInstance objects from the cluster and do a "retainAll" on them
+      // liveInstanceMap is an unmodifiableMap instances, so we filter using a stream
+      Map<String, LiveInstance> liveInstanceMap = dataProvider.getLiveInstances();
+      List<LiveInstance> filteredLiveInstances = liveInstanceMap.entrySet().stream()
+          .filter(entry -> liveInstances.contains(entry.getKey())).map(Map.Entry::getValue)
+          .collect(Collectors.toList());
+      // Synthetically create LiveInstance objects that are passed in as the parameter
+      // First, determine which new LiveInstance objects need to be created
+      List<String> liveInstanceList = new ArrayList<>(liveInstances);
+      liveInstanceList.removeAll(filteredLiveInstances.stream().map(LiveInstance::getInstanceName)
+          .collect(Collectors.toList()));
+      liveInstanceList.forEach(liveInstanceName -> {
+        // Create a new LiveInstance object and give it a random UUID as a session ID
+        LiveInstance newLiveInstanceObj = new LiveInstance(liveInstanceName);
+        newLiveInstanceObj.getRecord()
+            .setSimpleField(LiveInstance.LiveInstanceProperty.SESSION_ID.name(),
+                UUID.randomUUID().toString().replace("-", ""));
+        filteredLiveInstances.add(newLiveInstanceObj);
+      });
+      dataProvider.setLiveInstances(new ArrayList<>(filteredLiveInstances));
       dataProvider.setIdealStates(idealStates);
       dataProvider.setResourceConfigMap(resourceConfigs.stream()
           .collect(Collectors.toMap(ResourceConfig::getResourceName, Function.identity())));
@@ -222,8 +250,16 @@ public final class HelixUtil {
     // Convert the resulting BestPossibleStateOutput to Map<String, ResourceAssignment>
     Map<String, ResourceAssignment> result = new HashMap<>();
     BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+    if (output == null) {
+      throw new HelixException(
+          "getIdealAssignmentForWagedFullAuto(): Calculation failed: Failed to compute BestPossibleState!");
+    }
     Map<String, Resource> resourceMap =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+    if (resourceMap == null) {
+      throw new HelixException(
+          "getIdealAssignmentForWagedFullAuto(): Calculation failed: RESOURCES_TO_REBALANCE is null!");
+    }
     for (Resource resource : resourceMap.values()) {
       String resourceName = resource.getResourceName();
       PartitionStateMap partitionStateMap = output.getPartitionStateMap(resourceName);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 2522696..e700524 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
@@ -193,6 +195,30 @@ public class TestWagedRebalance extends ZkTestBase {
       Assert.assertEquals(utilResult.get(idealState.getResourceName()).getRecord().getMapFields(),
           idealState.getRecord().getMapFields());
     }
+
+    // Try to add a few extra instances
+    String instance_0 = "instance_0";
+    String instance_1 = "instance_1";
+    Set<String> newInstances = new HashSet<>();
+    newInstances.add(instance_0);
+    newInstances.add(instance_1);
+    liveInstances.addAll(newInstances);
+    for (String instance : newInstances) {
+      InstanceConfig instanceConfig = new InstanceConfig(instance);
+      instanceConfigs.add(instanceConfig);
+    }
+
+    utilResult = HelixUtil
+        .getIdealAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs, liveInstances,
+            idealStates, resourceConfigs);
+
+    Set<String> instancesWithAssignments = new HashSet<>();
+    utilResult.values().forEach(
+        resourceAssignment -> resourceAssignment.getRecord().getMapFields().values()
+            .forEach(entry -> instancesWithAssignments.addAll(entry.keySet())));
+    // The newly added instances should contain some partitions
+    Assert.assertTrue(instancesWithAssignments.contains(instance_0));
+    Assert.assertTrue(instancesWithAssignments.contains(instance_1));
   }
 
   @Test(dependsOnMethods = "test")