You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2021/06/01 18:46:26 UTC
[helix] branch master updated: Add a Waged rebalancer util api that
do not need raw zk address (#1756)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new c93a7e3 Add a Waged rebalancer util api that do not need raw zk address (#1756)
c93a7e3 is described below
commit c93a7e33604fbb2efeb013986fb6a6411d841ad2
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Tue Jun 1 11:46:18 2021 -0700
Add a Waged rebalancer util api that do not need raw zk address (#1756)
---
.../rebalancer/waged/ReadOnlyWagedRebalancer.java | 13 ++----
.../helix/manager/zk/ZkBucketDataAccessor.java | 42 +++++++++--------
.../BestPossibleExternalViewVerifier.java | 3 +-
.../main/java/org/apache/helix/util/HelixUtil.java | 54 +++++++++++++++++-----
4 files changed, 72 insertions(+), 40 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index 80b62ee..e94148e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -23,16 +23,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
-import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
@@ -45,9 +41,9 @@ import org.apache.helix.model.ResourceAssignment;
* This class is to be used in the cluster verifiers, tests, or util methods.
*/
public class ReadOnlyWagedRebalancer extends WagedRebalancer {
- public ReadOnlyWagedRebalancer(String metadataStoreAddress, String clusterName,
+ public ReadOnlyWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, String clusterName,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
- super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddress, clusterName),
+ super(new ReadOnlyAssignmentMetadataStore(zkBucketDataAccessor, clusterName),
ConstraintBasedAlgorithmFactory.getInstance(preferences), Optional.empty());
}
@@ -65,8 +61,9 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
}
private static class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
- ReadOnlyAssignmentMetadataStore(String metadataStoreAddress, String clusterName) {
- super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName);
+
+ ReadOnlyAssignmentMetadataStore(ZkBucketDataAccessor zkBucketDataAccessor, String clusterName) {
+ super(zkBucketDataAccessor, clusterName);
}
@Override
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index d7c5502..2f35994 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -83,25 +83,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
- if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
- LOG.warn(
- "ZkBucketDataAccessor: either multi-zk enabled or zkAddr is null - "
- + "starting ZkBucketDataAccessor in multi-zk mode!");
- try {
- // Create realm-aware ZkClient.
- RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
- new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
- RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
- new RealmAwareZkClient.RealmAwareZkClientConfig();
- _zkClient = new FederatedZkClient(connectionConfig, clientConfig);
- } catch (IllegalArgumentException | InvalidRoutingDataException e) {
- throw new HelixException("Not able to connect on realm-aware mode", e);
- }
- } else {
- _zkClient = DedicatedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
- }
-
+ _zkClient = createRealmAwareZkClient(zkAddr);
_zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
@@ -130,6 +112,28 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
}
+ private static RealmAwareZkClient createRealmAwareZkClient(String zkAddr) {
+ RealmAwareZkClient zkClient;
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
+ LOG.warn("ZkBucketDataAccessor: either multi-zk enabled or zkAddr is null - "
+ + "starting ZkBucketDataAccessor in multi-zk mode!");
+ try {
+ // Create realm-aware ZkClient.
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig();
+ zkClient = new FederatedZkClient(connectionConfig, clientConfig);
+ } catch (IllegalArgumentException | InvalidRoutingDataException e) {
+ throw new HelixException("Not able to connect on realm-aware mode", e);
+ }
+ } else {
+ zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ }
+ return zkClient;
+ }
+
@Override
public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath, T value)
throws IOException {
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index f916583..eaab1a2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -43,6 +43,7 @@ import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -463,7 +464,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
- super(metadataStoreAddress, clusterName, preferences);
+ super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName, preferences);
}
@Override
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 413305c..ce22c5f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -51,6 +51,7 @@ import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -58,7 +59,6 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -188,7 +188,33 @@ public final class HelixUtil {
String metadataStoreAddress, ClusterConfig clusterConfig,
List<InstanceConfig> instanceConfigs, List<String> liveInstances,
List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
- return getAssignmentForWagedFullAutoImpl(metadataStoreAddress, clusterConfig,
+ BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
+ Map<String, ResourceAssignment> result =
+ getAssignmentForWagedFullAutoImpl(new ZkBucketDataAccessor(metadataStoreAddress),
+ baseDataAccessor, clusterConfig, instanceConfigs, liveInstances, idealStates,
+ resourceConfigs, false);
+ baseDataAccessor.close();
+ return result;
+ }
+
+ /**
+ * Returns the expected ideal ResourceAssignments for the given resources in the cluster
+ * calculated using the read-only WAGED rebalancer. The returned result is based on preference
+ * lists, which is the target stable assignment.
+ * @param zkBucketDataAccessor
+ * @param baseDataAccessor
+ * @param clusterConfig
+ * @param instanceConfigs
+ * @param liveInstances
+ * @param idealStates
+ * @param resourceConfigs
+ * @return
+ */
+ public static Map<String, ResourceAssignment> getTargetAssignmentForWagedFullAuto(
+ ZkBucketDataAccessor zkBucketDataAccessor, BaseDataAccessor<ZNRecord> baseDataAccessor,
+ ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+ List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+ return getAssignmentForWagedFullAutoImpl(zkBucketDataAccessor, baseDataAccessor, clusterConfig,
instanceConfigs, liveInstances, idealStates, resourceConfigs, true);
}
@@ -210,8 +236,13 @@ public final class HelixUtil {
String metadataStoreAddress, ClusterConfig clusterConfig,
List<InstanceConfig> instanceConfigs, List<String> liveInstances,
List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
- return getAssignmentForWagedFullAutoImpl(metadataStoreAddress, clusterConfig,
- instanceConfigs, liveInstances, idealStates, resourceConfigs, false);
+ BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
+ Map<String, ResourceAssignment> result =
+ getAssignmentForWagedFullAutoImpl(new ZkBucketDataAccessor(metadataStoreAddress),
+ baseDataAccessor, clusterConfig, instanceConfigs, liveInstances, idealStates,
+ resourceConfigs, false);
+ baseDataAccessor.close();
+ return result;
}
/*
@@ -220,9 +251,10 @@ public final class HelixUtil {
* preference lists.
*/
private static Map<String, ResourceAssignment> getAssignmentForWagedFullAutoImpl(
- String metadataStoreAddress, ClusterConfig clusterConfig,
- List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+ ZkBucketDataAccessor zkBucketDataAccessor, BaseDataAccessor<ZNRecord> baseDataAccessor,
+ ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances,
List<IdealState> idealStates, List<ResourceConfig> resourceConfigs, boolean usePrefLists) {
+
// Copy the cluster config and make globalRebalance happen synchronously
// Otherwise, globalRebalance may not complete and this util might end up returning
// an empty assignment.
@@ -230,13 +262,12 @@ public final class HelixUtil {
globalSyncClusterConfig.setGlobalRebalanceAsyncMode(false);
// Prepare a data accessor for a dataProvider (cache) refresh
- BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
HelixDataAccessor helixDataAccessor =
new ZKHelixDataAccessor(globalSyncClusterConfig.getClusterName(), baseDataAccessor);
// Create an instance of read-only WAGED rebalancer
ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
- new ReadOnlyWagedRebalancer(metadataStoreAddress, globalSyncClusterConfig.getClusterName(),
+ new ReadOnlyWagedRebalancer(zkBucketDataAccessor, globalSyncClusterConfig.getClusterName(),
globalSyncClusterConfig.getGlobalRebalancePreference());
// Use a dummy event to run the required stages for BestPossibleState calculation
@@ -295,7 +326,6 @@ public final class HelixUtil {
LOG.error("getIdealAssignmentForWagedFullAuto(): Failed to compute ResourceAssignments!", e);
} finally {
// Close all ZK connections
- baseDataAccessor.close();
readOnlyWagedRebalancer.close();
}
@@ -317,9 +347,9 @@ public final class HelixUtil {
for (String partitionName : idealState.getPartitionSet()) {
Partition partition = new Partition(partitionName);
if (usePrefLists) {
- resourceAssignment.addReplicaMap(partition, computeIdealMapping(
- output.getPreferenceList(resourceName, partitionName),
- stateModelDefinition, new HashSet<>(liveInstances)));
+ resourceAssignment.addReplicaMap(partition,
+ computeIdealMapping(output.getPreferenceList(resourceName, partitionName),
+ stateModelDefinition, new HashSet<>(liveInstances)));
} else {
resourceAssignment.addReplicaMap(partition, partitionStateMap.getPartitionMap(partition));
}