You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:46:26 UTC

[helix] branch master updated: Add a Waged rebalancer util api that do not need raw zk address (#1756)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new c93a7e3  Add a Waged rebalancer util api that do not need raw zk address (#1756)
c93a7e3 is described below

commit c93a7e33604fbb2efeb013986fb6a6411d841ad2
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Jun 1 11:46:18 2021 -0700

    Add a Waged rebalancer util api that do not need raw zk address (#1756)
---
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  | 13 ++----
 .../helix/manager/zk/ZkBucketDataAccessor.java     | 42 +++++++++--------
 .../BestPossibleExternalViewVerifier.java          |  3 +-
 .../main/java/org/apache/helix/util/HelixUtil.java | 54 +++++++++++++++++-----
 4 files changed, 72 insertions(+), 40 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index 80b62ee..e94148e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -23,16 +23,12 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
-import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 
 
@@ -45,9 +41,9 @@ import org.apache.helix.model.ResourceAssignment;
  * This class is to be used in the cluster verifiers, tests, or util methods.
  */
 public class ReadOnlyWagedRebalancer extends WagedRebalancer {
-  public ReadOnlyWagedRebalancer(String metadataStoreAddress, String clusterName,
+  public ReadOnlyWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, String clusterName,
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-    super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddress, clusterName),
+    super(new ReadOnlyAssignmentMetadataStore(zkBucketDataAccessor, clusterName),
         ConstraintBasedAlgorithmFactory.getInstance(preferences), Optional.empty());
   }
 
@@ -65,8 +61,9 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
   }
 
   private static class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
-    ReadOnlyAssignmentMetadataStore(String metadataStoreAddress, String clusterName) {
-      super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName);
+
+    ReadOnlyAssignmentMetadataStore(ZkBucketDataAccessor zkBucketDataAccessor, String clusterName) {
+      super(zkBucketDataAccessor, clusterName);
     }
 
     @Override
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index d7c5502..2f35994 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -83,25 +83,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
    * @param versionTTLms in ms
    */
   public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
-    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
-      LOG.warn(
-          "ZkBucketDataAccessor: either multi-zk enabled or zkAddr is null - "
-              + "starting ZkBucketDataAccessor in multi-zk mode!");
-      try {
-        // Create realm-aware ZkClient.
-        RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
-            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
-        RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
-            new RealmAwareZkClient.RealmAwareZkClientConfig();
-        _zkClient = new FederatedZkClient(connectionConfig, clientConfig);
-      } catch (IllegalArgumentException | InvalidRoutingDataException e) {
-        throw new HelixException("Not able to connect on realm-aware mode", e);
-      }
-    } else {
-      _zkClient = DedicatedZkClientFactory.getInstance()
-          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
-    }
-
+    _zkClient = createRealmAwareZkClient(zkAddr);
     _zkClient.setZkSerializer(new ZkSerializer() {
       @Override
       public byte[] serialize(Object data) throws ZkMarshallingError {
@@ -130,6 +112,28 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
     this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
   }
 
+  private static RealmAwareZkClient createRealmAwareZkClient(String zkAddr) {
+    RealmAwareZkClient zkClient;
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
+      LOG.warn("ZkBucketDataAccessor: either multi-zk enabled or zkAddr is null - "
+          + "starting ZkBucketDataAccessor in multi-zk mode!");
+      try {
+        // Create realm-aware ZkClient.
+        RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+        RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+            new RealmAwareZkClient.RealmAwareZkClientConfig();
+        zkClient = new FederatedZkClient(connectionConfig, clientConfig);
+      } catch (IllegalArgumentException | InvalidRoutingDataException e) {
+        throw new HelixException("Not able to connect on realm-aware mode", e);
+      }
+    } else {
+      zkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    }
+    return zkClient;
+  }
+
   @Override
   public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath, T value)
       throws IOException {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index f916583..eaab1a2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -43,6 +43,7 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -463,7 +464,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
   private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
     public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
         Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-      super(metadataStoreAddress, clusterName, preferences);
+      super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName, preferences);
     }
 
     @Override
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 413305c..ce22c5f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -51,6 +51,7 @@ import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -58,7 +59,6 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -188,7 +188,33 @@ public final class HelixUtil {
       String metadataStoreAddress, ClusterConfig clusterConfig,
       List<InstanceConfig> instanceConfigs, List<String> liveInstances,
       List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
-    return getAssignmentForWagedFullAutoImpl(metadataStoreAddress, clusterConfig,
+    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
+    Map<String, ResourceAssignment> result =
+        getAssignmentForWagedFullAutoImpl(new ZkBucketDataAccessor(metadataStoreAddress),
+            baseDataAccessor, clusterConfig, instanceConfigs, liveInstances, idealStates,
+            resourceConfigs, false);
+    baseDataAccessor.close();
+    return result;
+  }
+
+  /**
+   * Returns the expected ideal ResourceAssignments for the given resources in the cluster
+   * calculated using the read-only WAGED rebalancer. The returned result is based on preference
+   * lists, which is the target stable assignment.
+   * @param zkBucketDataAccessor
+   * @param baseDataAccessor
+   * @param clusterConfig
+   * @param instanceConfigs
+   * @param liveInstances
+   * @param idealStates
+   * @param resourceConfigs
+   * @return
+   */
+  public static Map<String, ResourceAssignment> getTargetAssignmentForWagedFullAuto(
+      ZkBucketDataAccessor zkBucketDataAccessor, BaseDataAccessor<ZNRecord> baseDataAccessor,
+      ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+      List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+    return getAssignmentForWagedFullAutoImpl(zkBucketDataAccessor, baseDataAccessor, clusterConfig,
         instanceConfigs, liveInstances, idealStates, resourceConfigs, true);
   }
 
@@ -210,8 +236,13 @@ public final class HelixUtil {
       String metadataStoreAddress, ClusterConfig clusterConfig,
       List<InstanceConfig> instanceConfigs, List<String> liveInstances,
       List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
-    return getAssignmentForWagedFullAutoImpl(metadataStoreAddress, clusterConfig,
-        instanceConfigs, liveInstances, idealStates, resourceConfigs, false);
+    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
+    Map<String, ResourceAssignment> result =
+        getAssignmentForWagedFullAutoImpl(new ZkBucketDataAccessor(metadataStoreAddress),
+            baseDataAccessor, clusterConfig, instanceConfigs, liveInstances, idealStates,
+            resourceConfigs, false);
+    baseDataAccessor.close();
+    return result;
   }
 
   /*
@@ -220,9 +251,10 @@ public final class HelixUtil {
    * preference lists.
    */
   private static Map<String, ResourceAssignment> getAssignmentForWagedFullAutoImpl(
-      String metadataStoreAddress, ClusterConfig clusterConfig,
-      List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+      ZkBucketDataAccessor zkBucketDataAccessor, BaseDataAccessor<ZNRecord> baseDataAccessor,
+      ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances,
       List<IdealState> idealStates, List<ResourceConfig> resourceConfigs, boolean usePrefLists) {
+
     // Copy the cluster config and make globalRebalance happen synchronously
     // Otherwise, globalRebalance may not complete and this util might end up returning
     // an empty assignment.
@@ -230,13 +262,12 @@ public final class HelixUtil {
     globalSyncClusterConfig.setGlobalRebalanceAsyncMode(false);
 
     // Prepare a data accessor for a dataProvider (cache) refresh
-    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
     HelixDataAccessor helixDataAccessor =
         new ZKHelixDataAccessor(globalSyncClusterConfig.getClusterName(), baseDataAccessor);
 
     // Create an instance of read-only WAGED rebalancer
     ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
-        new ReadOnlyWagedRebalancer(metadataStoreAddress, globalSyncClusterConfig.getClusterName(),
+        new ReadOnlyWagedRebalancer(zkBucketDataAccessor, globalSyncClusterConfig.getClusterName(),
             globalSyncClusterConfig.getGlobalRebalancePreference());
 
     // Use a dummy event to run the required stages for BestPossibleState calculation
@@ -295,7 +326,6 @@ public final class HelixUtil {
       LOG.error("getIdealAssignmentForWagedFullAuto(): Failed to compute ResourceAssignments!", e);
     } finally {
       // Close all ZK connections
-      baseDataAccessor.close();
       readOnlyWagedRebalancer.close();
     }
 
@@ -317,9 +347,9 @@ public final class HelixUtil {
       for (String partitionName : idealState.getPartitionSet()) {
         Partition partition = new Partition(partitionName);
         if (usePrefLists) {
-          resourceAssignment.addReplicaMap(partition, computeIdealMapping(
-              output.getPreferenceList(resourceName, partitionName),
-              stateModelDefinition, new HashSet<>(liveInstances)));
+          resourceAssignment.addReplicaMap(partition,
+              computeIdealMapping(output.getPreferenceList(resourceName, partitionName),
+                  stateModelDefinition, new HashSet<>(liveInstances)));
         } else {
           resourceAssignment.addReplicaMap(partition, partitionStateMap.getPartitionMap(partition));
         }