You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/06/10 22:17:26 UTC
[helix] branch master updated: Fix ReadOnlyWagedRebalancer so that
it computes mapping from scratch (#1058)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new cadc17f Fix ReadOnlyWagedRebalancer so that it computes mapping from scratch (#1058)
cadc17f is described below
commit cadc17f519a7b1cd8984e1553c98e6a022ffda7c
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Jun 10 15:17:16 2020 -0700
Fix ReadOnlyWagedRebalancer so that it computes mapping from scratch (#1058)
Previously, ReadOnlyWagedRebalancer would only read from the previously computed best possible mapping and returns it. This commit changes it so that it computes things from scratch - it can read the previously computed best possible mapping but shouldn't just return it without doing any calculation.
---
.../rebalancer/waged/ReadOnlyWagedRebalancer.java | 9 ----
.../BestPossibleExternalViewVerifier.java | 30 ++++++++++---
.../main/java/org/apache/helix/util/HelixUtil.java | 52 ++++++++++++++++++----
.../WagedRebalancer/TestWagedRebalance.java | 26 +++++++++++
4 files changed, 95 insertions(+), 22 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index eccb175..d9816c2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -48,15 +48,6 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
ConstraintBasedAlgorithmFactory.getInstance(preferences), Optional.empty());
}
- @Override
- protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
- ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- Set<String> activeNodes, CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
- throws HelixRebalanceException {
- return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
- resourceMap.keySet());
- }
-
private static class ReadOnlyAssignmentMetadataStore extends AssignmentMetadataStore {
ReadOnlyAssignmentMetadataStore(String metadataStoreAddress, String clusterName) {
super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName);
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6fc833b..7ea9a05 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -30,21 +30,25 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixRebalanceException;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.RebalanceUtil;
@@ -408,14 +412,14 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
RebalanceUtil.runStage(event, new CurrentStateComputationStage());
// Note the readOnlyWagedRebalancer is just for one time usage
- ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
- new ReadOnlyWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
+ DryrunWagedRebalancer dryrunWagedRebalancer =
+ new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
cache.getClusterConfig().getGlobalRebalancePreference());
- event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), readOnlyWagedRebalancer);
+ event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
try {
RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
} finally {
- readOnlyWagedRebalancer.close();
+ dryrunWagedRebalancer.close();
}
BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
@@ -428,4 +432,20 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources["
+ (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
}
+
+ private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
+ public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+ super(metadataStoreAddress, clusterName, preferences);
+ }
+
+ @Override
+ protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+ ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, CurrentStateOutput currentStateOutput,
+ RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+ return getBestPossibleAssignment(getAssignmentMetadataStore(), currentStateOutput,
+ resourceMap.keySet());
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 348ce07..c1d5ad0 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -28,12 +28,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.base.Joiner;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.PropertyType;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -175,31 +177,57 @@ public final class HelixUtil {
String metadataStoreAddress, ClusterConfig clusterConfig,
List<InstanceConfig> instanceConfigs, List<String> liveInstances,
List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+ // Copy the cluster config and make globalRebalance happen synchronously
+ // Otherwise, globalRebalance may not complete and this util might end up returning
+ // an empty assignment.
+ ClusterConfig globalSyncClusterConfig = new ClusterConfig(clusterConfig.getRecord());
+ globalSyncClusterConfig.setGlobalRebalanceAsyncMode(false);
+
// Prepare a data accessor for a dataProvider (cache) refresh
BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(metadataStoreAddress);
HelixDataAccessor helixDataAccessor =
- new ZKHelixDataAccessor(clusterConfig.getClusterName(), baseDataAccessor);
+ new ZKHelixDataAccessor(globalSyncClusterConfig.getClusterName(), baseDataAccessor);
// Create an instance of read-only WAGED rebalancer
ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
- new ReadOnlyWagedRebalancer(metadataStoreAddress, clusterConfig.getClusterName(),
- clusterConfig.getGlobalRebalancePreference());
+ new ReadOnlyWagedRebalancer(metadataStoreAddress, globalSyncClusterConfig.getClusterName(),
+ globalSyncClusterConfig.getGlobalRebalancePreference());
// Use a dummy event to run the required stages for BestPossibleState calculation
// Attributes RESOURCES and RESOURCES_TO_REBALANCE are populated in ResourceComputationStage
- ClusterEvent event = new ClusterEvent(clusterConfig.getClusterName(), ClusterEventType.Unknown);
+ ClusterEvent event =
+ new ClusterEvent(globalSyncClusterConfig.getClusterName(), ClusterEventType.Unknown);
try {
// Obtain a refreshed dataProvider (cache) and overwrite cluster parameters with the given parameters
ResourceControllerDataProvider dataProvider =
- new ResourceControllerDataProvider(clusterConfig.getClusterName());
+ new ResourceControllerDataProvider(globalSyncClusterConfig.getClusterName());
dataProvider.requireFullRefresh();
dataProvider.refresh(helixDataAccessor);
- dataProvider.setClusterConfig(clusterConfig);
+ dataProvider.setClusterConfig(globalSyncClusterConfig);
dataProvider.setInstanceConfigMap(instanceConfigs.stream()
.collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())));
- dataProvider.setLiveInstances(
- liveInstances.stream().map(LiveInstance::new).collect(Collectors.toList()));
+ // For LiveInstances, we must preserve the existing session IDs
+ // So read LiveInstance objects from the cluster and do a "retainAll" on them
+ // liveInstanceMap is an unmodifiableMap instances, so we filter using a stream
+ Map<String, LiveInstance> liveInstanceMap = dataProvider.getLiveInstances();
+ List<LiveInstance> filteredLiveInstances = liveInstanceMap.entrySet().stream()
+ .filter(entry -> liveInstances.contains(entry.getKey())).map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ // Synthetically create LiveInstance objects that are passed in as the parameter
+ // First, determine which new LiveInstance objects need to be created
+ List<String> liveInstanceList = new ArrayList<>(liveInstances);
+ liveInstanceList.removeAll(filteredLiveInstances.stream().map(LiveInstance::getInstanceName)
+ .collect(Collectors.toList()));
+ liveInstanceList.forEach(liveInstanceName -> {
+ // Create a new LiveInstance object and give it a random UUID as a session ID
+ LiveInstance newLiveInstanceObj = new LiveInstance(liveInstanceName);
+ newLiveInstanceObj.getRecord()
+ .setSimpleField(LiveInstance.LiveInstanceProperty.SESSION_ID.name(),
+ UUID.randomUUID().toString().replace("-", ""));
+ filteredLiveInstances.add(newLiveInstanceObj);
+ });
+ dataProvider.setLiveInstances(new ArrayList<>(filteredLiveInstances));
dataProvider.setIdealStates(idealStates);
dataProvider.setResourceConfigMap(resourceConfigs.stream()
.collect(Collectors.toMap(ResourceConfig::getResourceName, Function.identity())));
@@ -222,8 +250,16 @@ public final class HelixUtil {
// Convert the resulting BestPossibleStateOutput to Map<String, ResourceAssignment>
Map<String, ResourceAssignment> result = new HashMap<>();
BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ if (output == null) {
+ throw new HelixException(
+ "getIdealAssignmentForWagedFullAuto(): Calculation failed: Failed to compute BestPossibleState!");
+ }
Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+ if (resourceMap == null) {
+ throw new HelixException(
+ "getIdealAssignmentForWagedFullAuto(): Calculation failed: RESOURCES_TO_REBALANCE is null!");
+ }
for (Resource resource : resourceMap.values()) {
String resourceName = resource.getResourceName();
PartitionStateMap partitionStateMap = output.getPartitionStateMap(resourceName);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 2522696..e700524 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.ConfigAccessor;
@@ -193,6 +195,30 @@ public class TestWagedRebalance extends ZkTestBase {
Assert.assertEquals(utilResult.get(idealState.getResourceName()).getRecord().getMapFields(),
idealState.getRecord().getMapFields());
}
+
+ // Try to add a few extra instances
+ String instance_0 = "instance_0";
+ String instance_1 = "instance_1";
+ Set<String> newInstances = new HashSet<>();
+ newInstances.add(instance_0);
+ newInstances.add(instance_1);
+ liveInstances.addAll(newInstances);
+ for (String instance : newInstances) {
+ InstanceConfig instanceConfig = new InstanceConfig(instance);
+ instanceConfigs.add(instanceConfig);
+ }
+
+ utilResult = HelixUtil
+ .getIdealAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig, instanceConfigs, liveInstances,
+ idealStates, resourceConfigs);
+
+ Set<String> instancesWithAssignments = new HashSet<>();
+ utilResult.values().forEach(
+ resourceAssignment -> resourceAssignment.getRecord().getMapFields().values()
+ .forEach(entry -> instancesWithAssignments.addAll(entry.keySet())));
+ // The newly added instances should contain some partitions
+ Assert.assertTrue(instancesWithAssignments.contains(instance_0));
+ Assert.assertTrue(instancesWithAssignments.contains(instance_1));
}
@Test(dependsOnMethods = "test")