You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/12/02 00:33:40 UTC
[helix] branch nealsun/waged-pipeline-redesign updated: WAGED pipeline redesign
This is an automated email from the ASF dual-hosted git repository.
nealsun pushed a commit to branch nealsun/waged-pipeline-redesign
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/nealsun/waged-pipeline-redesign by this push:
new 0936a2af2 WAGED pipeline redesign
0936a2af2 is described below
commit 0936a2af2a625cccd469a468ed2fc53c02f1b802
Author: Neal Sun <ne...@linkedin.com>
AuthorDate: Thu Dec 1 16:33:34 2022 -0800
WAGED pipeline redesign
Added new WAGED pipeline (emergency) and altered behaviors for performance.
---
.../rebalancer/waged/AssignmentMetadataStore.java | 116 +++++----
.../rebalancer/waged/ReadOnlyWagedRebalancer.java | 27 +-
.../rebalancer/waged/WagedRebalancer.java | 283 ++++++++++++++++-----
.../waged/model/ClusterModelProvider.java | 70 ++++-
.../java/org/apache/helix/model/ClusterConfig.java | 1 +
.../waged/MockAssignmentMetadataStore.java | 19 +-
.../waged/TestAssignmentMetadataStore.java | 24 --
.../rebalancer/waged/TestWagedRebalancer.java | 14 +-
8 files changed, 389 insertions(+), 165 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index dd502893e..bc4120cb4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -50,6 +50,7 @@ public class AssignmentMetadataStore {
private String _bestPossiblePath;
protected Map<String, ResourceAssignment> _globalBaseline;
protected Map<String, ResourceAssignment> _bestPossibleAssignment;
+ protected int _bestPossibleVersion = 0;
AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
@@ -92,60 +93,78 @@ public class AssignmentMetadataStore {
}
/**
- * @return true if a new baseline was persisted.
+ * @param newAssignment
+ * @param path the path of the assignment record
+ * @param key the key of the assignment in the record
* @throws HelixException if the method failed to persist the baseline.
*/
- public synchronized boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
- return persistAssignment(globalBaseline, getBaseline(), _baselinePath, BASELINE_KEY);
+ private void persistAssignmentToMetadataStore(Map<String, ResourceAssignment> newAssignment, String path, String key)
+ throws HelixException {
+ // TODO: Make the write async?
+ // Persist to ZK
+ HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
+ try {
+ _dataAccessor.compressedBucketWrite(path, combinedAssignments);
+ } catch (IOException e) {
+ throw new HelixException(String.format("Failed to persist %s assignment to path %s", key, path), e);
+ }
}
/**
- * @return true if a new best possible assignment was persisted.
- * @throws HelixException if the method failed to persist the baseline.
+ * Persist a new baseline assignment to metadata store first, then to memory
+ * @param globalBaseline
*/
- public synchronized boolean persistBestPossibleAssignment(
- Map<String, ResourceAssignment> bestPossibleAssignment) {
- return persistAssignment(bestPossibleAssignment, getBestPossibleAssignment(), _bestPossiblePath,
- BEST_POSSIBLE_KEY);
+ public synchronized void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+ // write to metadata store
+ persistAssignmentToMetadataStore(globalBaseline, _baselinePath, BASELINE_KEY);
+ // write to memory
+ getBaseline().clear();
+ getBaseline().putAll(globalBaseline);
}
- public synchronized void clearAssignmentMetadata() {
- persistAssignment(Collections.emptyMap(), getBaseline(), _baselinePath, BASELINE_KEY);
- persistAssignment(Collections.emptyMap(), getBestPossibleAssignment(), _bestPossiblePath,
- BEST_POSSIBLE_KEY);
+ /**
+ * Persist a new best possible assignment to metadata store first, then to memory.
+ * Increment best possible version by 1 - this is a high priority in-memory write.
+ * @param bestPossibleAssignment
+ */
+ public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
+ // write to metadata store
+ persistAssignmentToMetadataStore(bestPossibleAssignment, _bestPossiblePath, BEST_POSSIBLE_KEY);
+ // write to memory
+ getBestPossibleAssignment().clear();
+ getBestPossibleAssignment().putAll(bestPossibleAssignment);
+ _bestPossibleVersion++;
}
/**
- * @param newAssignment
- * @param cachedAssignment
- * @param path the path of the assignment record
- * @param key the key of the assignment in the record
- * @return true if a new assignment was persisted.
+ * Attempts to persist Best Possible Assignment in memory from an asynchronous thread.
+ * Persist only happens when the provided version is not stale - this is a low priority in-memory write.
+ * @param bestPossibleAssignment - new assignment to be persisted
+ * @param newVersion - attempted new version to write. This version is obtained earlier from getBestPossibleVersion()
+ * @return true if the attempt succeeded, false otherwise.
*/
- // TODO: Enhance the return value so it is more intuitive to understand when the persist fails and
- // TODO: when it is skipped.
- private boolean persistAssignment(Map<String, ResourceAssignment> newAssignment,
- Map<String, ResourceAssignment> cachedAssignment, String path,
- String key) {
- // TODO: Make the write async?
- // If the assignment hasn't changed, skip writing to metadata store
- if (compareAssignments(cachedAssignment, newAssignment)) {
- return false;
- }
- // Persist to ZK
- HelixProperty combinedAssignments = combineAssignments(key, newAssignment);
- try {
- _dataAccessor.compressedBucketWrite(path, combinedAssignments);
- } catch (IOException e) {
- // TODO: Improve failure handling
- throw new HelixException(
- String.format("Failed to persist %s assignment to path %s", key, path), e);
+ public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+ Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+ // Check if the version is stale by this point
+ if (newVersion > _bestPossibleVersion) {
+ getBestPossibleAssignment().clear();
+ getBestPossibleAssignment().putAll(bestPossibleAssignment);
+ _bestPossibleVersion = newVersion;
+ return true;
}
- // Update the in-memory reference
- cachedAssignment.clear();
- cachedAssignment.putAll(newAssignment);
- return true;
+ return false;
+ }
+
+ public int getBestPossibleVersion() {
+ return _bestPossibleVersion;
+ }
+
+ public synchronized void clearAssignmentMetadata() {
+ persistAssignmentToMetadataStore(Collections.emptyMap(), _baselinePath, BASELINE_KEY);
+ persistAssignmentToMetadataStore(Collections.emptyMap(), _bestPossiblePath, BEST_POSSIBLE_KEY);
+ getBaseline().clear();
+ getBestPossibleAssignment().clear();
}
protected synchronized void reset() {
@@ -200,16 +219,11 @@ public class AssignmentMetadataStore {
return assignmentMap;
}
- /**
- * Returns whether two assignments are same.
- * @param oldAssignment
- * @param newAssignment
- * @return true if they are the same. False otherwise or oldAssignment is null
- */
- protected boolean compareAssignments(Map<String, ResourceAssignment> oldAssignment,
- Map<String, ResourceAssignment> newAssignment) {
- // If oldAssignment is null, that means that we haven't read from/written to
- // the metadata store yet. In that case, we return false so that we write to metadata store.
- return oldAssignment != null && oldAssignment.equals(newAssignment);
+ protected boolean isBaselineChanged(Map<String, ResourceAssignment> newBaseline) {
+ return !getBaseline().equals(newBaseline);
+ }
+
+ protected boolean isBestPossibleChanged(Map<String, ResourceAssignment> newBestPossible) {
+ return !getBestPossibleAssignment().equals(newBestPossible);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index e94148e99..711871381 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -67,26 +67,29 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer {
}
@Override
- public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
- // If baseline hasn't changed, skip updating the metadata store
- if (compareAssignments(_globalBaseline, globalBaseline)) {
- return false;
- }
+ public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// Update the in-memory reference only
_globalBaseline = globalBaseline;
- return true;
}
@Override
- public boolean persistBestPossibleAssignment(
+ public void persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
- // If bestPossibleAssignment hasn't changed, skip updating the metadata store
- if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment)) {
- return false;
- }
// Update the in-memory reference only
_bestPossibleAssignment = bestPossibleAssignment;
- return true;
+ _bestPossibleVersion++;
+ }
+ @Override
+ public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+ Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+ // Check if the version is stale by this point
+ if (newVersion > _bestPossibleVersion) {
+ _bestPossibleAssignment = bestPossibleAssignment;
+ _bestPossibleVersion = newVersion;
+ return true;
+ }
+
+ return false;
}
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index a99ad26ab..e7c09ab65 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
@@ -98,6 +99,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// To calculate the baseline asynchronously
private final ExecutorService _baselineCalculateExecutor;
+ private final ExecutorService _bestPossibleCalculateExecutor;
private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
@@ -114,6 +116,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
private final BaselineDivergenceGauge _baselineDivergenceGauge;
private boolean _asyncGlobalRebalanceEnabled;
+ private boolean _asyncPartialRebalanceEnabled;
+ private Future<Boolean> _asyncPartialRebalanceResult;
// Note, the rebalance algorithm field is mutable so it should not be directly referred except for
// the public method computeNewIdealStates.
@@ -149,7 +153,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// cluster has converged.
helixManager == null ? new WagedRebalancerMetricCollector()
: new WagedRebalancerMetricCollector(helixManager.getClusterName()),
- ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
+ ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED,
+ ClusterConfig.DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED);
_preference = ImmutableMap.copyOf(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
}
@@ -166,12 +171,13 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// If metricCollector is not provided, instantiate a version that does not register metrics
// in order to allow rebalancer to proceed
metricCollectorOptional.orElse(new WagedRebalancerMetricCollector()),
- false);
+ false, false);
}
private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
- MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
+ MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled,
+ boolean isAsyncPartialRebalanceEnabled) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the rebalance.");
@@ -216,7 +222,9 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_changeDetector = new ResourceChangeDetector(true);
_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+ _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+ _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
}
// Update the global rebalance mode to be asynchronous or synchronous
@@ -224,6 +232,11 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
}
+ // Update the partial rebalance mode to be asynchronous or synchronous
+ public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) {
+ _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+ }
+
// Update the rebalancer preference if the new options are different from the current preference.
public synchronized void updateRebalancePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
@@ -249,6 +262,9 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
if (_baselineCalculateExecutor != null) {
_baselineCalculateExecutor.shutdownNow();
}
+ if (_bestPossibleCalculateExecutor != null) {
+ _bestPossibleCalculateExecutor.shutdownNow();
+ }
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
@@ -329,22 +345,24 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
- Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
- computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput,
- algorithm));
+ Map<String, ResourceAssignment> newBestPossibleAssignment =
+ computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+ Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData, newBestPossibleAssignment);
// The additional rebalance overwrite is required since the calculated mapping may contain
// some delayed rebalanced assignments.
- if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
+ if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && requireRebalanceOverwrite(clusterData,
+ newBestPossibleAssignment)) {
applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()),
- algorithm);
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
- newIdealStates.forEach((key, value) -> applyUserDefinedPreferenceList(clusterData.getResourceConfig(key), value));
+ newIdealStates.forEach(
+ (resourceName, idealState) -> applyUserDefinedPreferenceList(clusterData.getResourceConfig(resourceName),
+ idealState));
return newIdealStates;
}
@@ -357,9 +375,10 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
throws HelixRebalanceException {
// Perform global rebalance for a new baseline assignment
globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
- // Perform partial rebalance for a new best possible assignment
+ // Perform emergency rebalance for a new best possible assignment
Map<String, ResourceAssignment> newAssignment =
- partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+ emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+
return newAssignment;
}
@@ -410,43 +429,22 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
* @param algorithm
* @throws HelixRebalanceException
*/
- private void globalRebalance(ResourceControllerDataProvider clusterData,
- Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput,
- RebalanceAlgorithm algorithm)
- throws HelixRebalanceException {
+ private void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
_changeDetector.updateSnapshots(clusterData);
// Get all the changed items' information. Filter for the items that have content changed.
- final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
- _changeDetector.getAllChanges();
-
- if (clusterChanges.keySet().stream()
- .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
- // Build the cluster model for rebalance calculation.
- // Note, for a Baseline calculation,
- // 1. Ignore node status (disable/offline).
- // 2. Use the previous Baseline as the only parameter about the previous assignment.
- Map<String, ResourceAssignment> currentBaseline =
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
- ClusterModel clusterModel;
- try {
- clusterModel = ClusterModelProvider
- .generateClusterModelForBaseline(clusterData, resourceMap,
- clusterData.getAllInstances(), clusterChanges, currentBaseline);
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
- HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
- }
+ final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
+ if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
- final String clusterName = clusterData.getClusterName();
// Calculate the Baseline assignment for global rebalance.
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
try {
- // Note that we should schedule a new partial rebalance for a future rebalance pipeline if
- // the planned partial rebalance in the current rebalance pipeline won't wait for the new
- // baseline being calculated.
- // So set shouldSchedulePartialRebalance to be !waitForGlobalRebalance
- calculateAndUpdateBaseline(clusterModel, algorithm, !waitForGlobalRebalance, clusterName);
+ // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
+ // be triggered again after baseline is finished.
+ // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
+ doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
+ clusterChanges);
} catch (HelixRebalanceException e) {
LOG.error("Failed to calculate baseline assignment!", e);
return false;
@@ -469,27 +467,40 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
/**
* Calculate and update the Baseline assignment
- * @param clusterModel
- * @param algorithm
- * @param shouldSchedulePartialRebalance True if the call should trigger a following partial rebalance
+ * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
* so the new Baseline could be applied to cluster.
- * @param clusterName
- * @throws HelixRebalanceException
*/
- private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm,
- boolean shouldSchedulePartialRebalance, String clusterName)
- throws HelixRebalanceException {
+ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_baselineCalcCounter.increment(1L);
_baselineCalcLatency.startMeasuringLatency();
- boolean isBaselineChanged = false;
+ // Build the cluster model for rebalance calculation.
+ // Note, for a Baseline calculation,
+ // 1. Ignore node status (disable/offline).
+ // 2. Use the previous Baseline as the only parameter about the previous assignment.
+ Map<String, ResourceAssignment> currentBaseline =
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+ ClusterModel clusterModel;
+ try {
+ clusterModel =
+ ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(),
+ clusterChanges, currentBaseline);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+
Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
+ boolean isBaselineChanged =
+ _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
// Write the new baseline to metadata store
- if (_assignmentMetadataStore != null) {
+ if (isBaselineChanged) {
try {
_writeLatency.startMeasuringLatency();
- isBaselineChanged = _assignmentMetadataStore.persistBaseline(newBaseline);
+ _assignmentMetadataStore.persistBaseline(newBaseline);
_writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
@@ -501,21 +512,64 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_baselineCalcLatency.endMeasuringLatency();
LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
- if (isBaselineChanged && shouldSchedulePartialRebalance) {
+ if (isBaselineChanged && shouldTriggerMainPipeline) {
LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
- RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
}
}
- private Map<String, ResourceAssignment> partialRebalance(
+ private void partialRebalance(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
+ // If partial rebalance is async and the previous result is not completed yet,
+ // do not start another partial rebalance.
+ if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
+ && !_asyncPartialRebalanceResult.isDone()) {
+ return;
+ }
+
+ _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
+ try {
+ doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
+ currentStateOutput);
+ } catch (HelixRebalanceException e) {
+ LOG.error("Failed to calculate best possible assignment!", e);
+ return false;
+ }
+ return true;
+ });
+ if (!_asyncPartialRebalanceEnabled) {
+ try {
+ if (!_asyncPartialRebalanceResult.get()) {
+ throw new HelixRebalanceException("Failed to calculate for the new best possible.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HelixRebalanceException("Failed to execute new best possible calculation.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+ }
+ }
+ }
+
+ /**
+ * Calculate and update the Best Possible assignment
+ */
+ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
_partialRebalanceCounter.increment(1L);
_partialRebalanceLatency.startMeasuringLatency();
- // TODO: Consider combining the metrics for both baseline/best possible?
+
+ int newBestPossibleAssignmentVersion = -1;
+ if (_assignmentMetadataStore != null) {
+ newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
+ } else {
+ LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
+ }
+
// Read the baseline from metadata store
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
@@ -547,20 +601,73 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
currentBaseline, newAssignmentCopy);
- if (_assignmentMetadataStore != null) {
- try {
- _writeLatency.startMeasuringLatency();
- _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
- _writeLatency.endMeasuringLatency();
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
+ boolean bestPossibleUpdateSuccessful = false;
+ if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+ bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
+ newBestPossibleAssignmentVersion);
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
}
_partialRebalanceLatency.endMeasuringLatency();
LOG.info("Finish calculating the new best possible assignment.");
+
+ if (bestPossibleUpdateSuccessful) {
+ LOG.info("Schedule a new rebalance after the new best possible calculation has finished.");
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
+ }
+ }
+
+ private Map<String, ResourceAssignment> emergencyRebalance(
+ ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
+ RebalanceAlgorithm algorithm)
+ throws HelixRebalanceException {
+ Map<String, ResourceAssignment> currentBestPossibleAssignment =
+ getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet());
+
+ // Step 1: Check for permanent node down
+ AtomicBoolean allNodesActive = new AtomicBoolean(true);
+ currentBestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
+ resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+ for (String instance : resourceAssignment.getReplicaMap(partition).keySet()) {
+ if (!activeNodes.contains(instance)) {
+ allNodesActive.set(false);
+ break;
+ }
+ }
+ });
+ }));
+
+
+ // Step 2: if there are permanent node downs, calculate for a new one best possible
+ Map<String, ResourceAssignment> newAssignment;
+ if (!allNodesActive.get()) {
+ ClusterModel clusterModel;
+ try {
+ clusterModel =
+ ClusterModelProvider.generateClusterModelForEmergencyRebalance(clusterData, resourceMap, activeNodes,
+ currentBestPossibleAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model for emergency rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+ newAssignment = calculateAssignment(clusterModel, algorithm);
+ } else {
+ newAssignment = currentBestPossibleAssignment;
+ }
+
+ // Step 3: persist result to metadata store
+ persistBestPossibleAssignment(newAssignment);
+ LOG.info("Finish emergency rebalance");
+
+ partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+ if (!_asyncPartialRebalanceEnabled) {
+ newAssignment = getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet());
+ persistBestPossibleAssignment(newAssignment);
+ }
+
return newAssignment;
}
@@ -684,6 +791,22 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
return currentBestAssignment;
}
+ private void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment)
+ throws HelixRebalanceException {
+ if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)) {
+ try {
+ _writeLatency.startMeasuringLatency();
+ _assignmentMetadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+ _writeLatency.endMeasuringLatency();
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ } else {
+ LOG.debug("Assignment Metadata Store is null. Skip persisting the best possible assignment.");
+ }
+ }
+
/**
* Schedule rebalance according to the delayed rebalance logic.
* @param clusterData the current cluster data cache
@@ -710,6 +833,34 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
}
}
+ private boolean requireRebalanceOverwrite(ResourceControllerDataProvider clusterData,
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
+ bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
+ String resourceName = resourceAssignment.getResourceName();
+ IdealState currentIdealState = clusterData.getIdealState(resourceName);
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+ int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
+ int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
+ .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
+ currentIdealState), currentIdealState, numReplica);
+ resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
+ int enabledLivePlacementCounter = 0;
+ for (String instance : resourceAssignment.getReplicaMap(partition).keySet()) {
+ if (enabledLiveInstances.contains(instance)) {
+ enabledLivePlacementCounter++;
+ }
+ }
+
+ if (enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica)) {
+ allMinActiveReplicaMet.set(false);
+ }
+ });
+ }));
+
+ return !allMinActiveReplicaMet.get();
+ }
+
/**
* Update the rebalanced ideal states according to the real active nodes.
* Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 0fc1f2b05..ce82c3207 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -51,7 +51,31 @@ public class ClusterModelProvider {
PARTIAL,
// Set the rebalance scope to cover all replicas that need relocation based on the cluster
// changes.
- GLOBAL_BASELINE
+ GLOBAL_BASELINE,
+ // Set the rebalance scope to cover only replicas that are assigned to downed instances.
+ EMERGENCY
+ }
+
+ /**
+ * Generate a new Cluster Model object according to the current cluster status for emergency
+ * rebalance. The rebalance scope is configured for recovering replicas that are on permanently
+ * downed nodes
+ * @param dataProvider The controller's data cache.
+ * @param resourceMap The full list of the resources to be rebalanced. Note that any
+ * resources that are not in this list will be removed from the
+ * final assignment.
+ * @param activeInstances The active instances that will be used in the calculation.
+ * Note this list can be different from the real active node list
+ * according to the rebalancer logic.
+ * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
+ * previous rebalance.
+ * @return the new cluster model
+ */
+ public static ClusterModel generateClusterModelForEmergencyRebalance(ResourceControllerDataProvider dataProvider,
+ Map<String, Resource> resourceMap, Set<String> activeInstances,
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ return generateClusterModel(dataProvider, resourceMap, activeInstances, Collections.emptyMap(),
+ Collections.emptyMap(), bestPossibleAssignment, RebalanceScopeType.EMERGENCY);
}
/**
@@ -165,6 +189,10 @@ public class ClusterModelProvider {
findToBeAssignedReplicasByComparingWithIdealAssignment(replicaMap, activeInstances,
idealAssignment, currentAssignment, allocatedReplicas);
break;
+ case EMERGENCY:
+ toBeAssignedReplicas = findToBeAssignedReplicasOnDownInstances(replicaMap, activeInstances,
+ currentAssignment, allocatedReplicas);
+ break;
default:
throw new HelixException("Unknown rebalance scope type: " + scopeType);
}
@@ -391,6 +419,46 @@ public class ClusterModelProvider {
return toBeAssignedReplicas;
}
+ /**
+ * Find replicas that were assigned to non-active nodes in the current assignment.
+ *
+ * @param replicaMap A map contains all the replicas grouped by resource name.
+ * @param activeInstances All the instances that are live and enabled according to the delay rebalance configuration.
+ * @param currentAssignment The current assignment that was generated in the previous rebalance.
+ * @param allocatedReplicas A map of <Instance -> replicas> to return the allocated replicas grouped by the target instance name.
+ * @return The replicas that need to be reassigned.
+ */
+ private static Set<AssignableReplica> findToBeAssignedReplicasOnDownInstances(
+ Map<String, Set<AssignableReplica>> replicaMap, Set<String> activeInstances,
+ Map<String, ResourceAssignment> currentAssignment,
+ Map<String, Set<AssignableReplica>> allocatedReplicas) {
+ // For any replica that are assigned to non-active instances (down instances), add them.
+ Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+ for (String resourceName : replicaMap.keySet()) {
+ Map<String, Map<String, Set<String>>> stateInstanceMap = getStateInstanceMap(currentAssignment.get(resourceName));
+
+ for (AssignableReplica replica : replicaMap.get(resourceName)) {
+ String partitionName = replica.getPartitionName();
+ String replicaState = replica.getReplicaState();
+ Set<String> currentAllocations =
+ stateInstanceMap.getOrDefault(partitionName, Collections.emptyMap())
+ .getOrDefault(replicaState, Collections.emptySet());
+ if (!currentAllocations.isEmpty()) {
+ String allocatedInstance = currentAllocations.iterator().next();
+ if (activeInstances.contains(allocatedInstance)) {
+ allocatedReplicas.computeIfAbsent(allocatedInstance, key -> new HashSet<>()).add(replica);
+ }
+ else {
+ toBeAssignedReplicas.add(replica);
+ }
+ currentAllocations.remove(allocatedInstance);
+ }
+ }
+ }
+
+ return toBeAssignedReplicas;
+ }
+
/**
* Filter to remove all invalid allocations that are not on the active instances.
* @param assignment
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 829724142..c4f9f914f 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -184,6 +184,7 @@ public class ClusterConfig extends HelixProperty {
private final static int MAX_REBALANCE_PREFERENCE = 1000;
private final static int MIN_REBALANCE_PREFERENCE = 0;
public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
+ public final static boolean DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED = true;
private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET = -1;
private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index a2220a5d4..fb59b8146 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -39,19 +39,30 @@ public class MockAssignmentMetadataStore extends AssignmentMetadataStore {
return _globalBaseline == null ? Collections.emptyMap() : _globalBaseline;
}
- public boolean persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+ public void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
_globalBaseline = globalBaseline;
- return true;
}
public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment == null ? Collections.emptyMap() : _bestPossibleAssignment;
}
- public boolean persistBestPossibleAssignment(
+ public void persistBestPossibleAssignment(
Map<String, ResourceAssignment> bestPossibleAssignment) {
_bestPossibleAssignment = bestPossibleAssignment;
- return true;
+ _bestPossibleVersion++;
+ }
+
+ public synchronized boolean asyncUpdateBestPossibleAssignmentCache(
+ Map<String, ResourceAssignment> bestPossibleAssignment, int newVersion) {
+ // Check if the version is stale by this point
+ if (newVersion > _bestPossibleVersion) {
+ _bestPossibleAssignment = bestPossibleAssignment;
+ _bestPossibleVersion = newVersion;
+ return true;
+ }
+
+ return false;
}
public void close() {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 84bc82957..7c5740423 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -96,31 +96,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
Assert.assertTrue(_store.getBestPossibleAssignment().isEmpty());
}
- /**
- * Test that if the old assignment and new assignment are the same,
- */
@Test(dependsOnMethods = "testReadEmptyBaseline")
- public void testAvoidingRedundantWrite() {
- Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
-
- // Call persist functions
- _store.persistBaseline(dummyAssignment);
- _store.persistBestPossibleAssignment(dummyAssignment);
-
- // Check that only one version exists
- Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
- Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
-
- // Call persist functions again
- _store.persistBaseline(dummyAssignment);
- _store.persistBestPossibleAssignment(dummyAssignment);
-
- // Check that only one version exists still
- Assert.assertEquals(getExistingVersionNumbers(BASELINE_KEY).size(), 1);
- Assert.assertEquals(getExistingVersionNumbers(BEST_POSSIBLE_KEY).size(), 1);
- }
-
- @Test(dependsOnMethods = "testAvoidingRedundantWrite")
public void testAssignmentCache() {
Map<String, ResourceAssignment> dummyAssignment = getDummyAssignment();
// Call persist functions
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index cfcbe298e..5d4047d5b 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -255,8 +255,8 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Map<String, ResourceAssignment> testResourceAssignmentMap = new HashMap<>();
ZNRecord mappingNode = new ZNRecord(_resourceNames.get(0));
HashMap<String, String> mapping = new HashMap<>();
- mapping.put(_partitionNames.get(0), "MASTER");
- mappingNode.setMapField(_testInstanceId, mapping);
+ mapping.put(_testInstanceId, "MASTER");
+ mappingNode.setMapField(_partitionNames.get(0), mapping);
testResourceAssignmentMap.put(_resourceNames.get(0), new ResourceAssignment(mappingNode));
_metadataStore.reset();
@@ -373,9 +373,9 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
- Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS);
+ Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
Assert.assertEquals(ex.getMessage(),
- "Failed to generate cluster model for partial rebalance. Failure Type: INVALID_CLUSTER_STATUS");
+ "Failed to calculate for the new best possible. Failure Type: FAILED_TO_CALCULATE");
}
// The rebalance will be done with empty mapping result since there is no previously calculated
@@ -389,7 +389,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
public void testInvalidRebalancerStatus() throws IOException {
// Mock a metadata store that will fail on all the calls.
AssignmentMetadataStore metadataStore = Mockito.mock(AssignmentMetadataStore.class);
- when(metadataStore.getBaseline())
+ when(metadataStore.getBestPossibleAssignment())
.thenThrow(new RuntimeException("Mock Error. Metadata store fails."));
WagedRebalancer rebalancer = new WagedRebalancer(metadataStore, _algorithm, Optional.empty());
@@ -404,7 +404,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Assert.assertEquals(ex.getFailureType(),
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
Assert.assertEquals(ex.getMessage(),
- "Failed to get the current baseline assignment because of unexpected error. Failure Type: INVALID_REBALANCER_STATUS");
+ "Failed to get the current best possible assignment because of unexpected error. Failure Type: INVALID_REBALANCER_STATUS");
}
}
@@ -439,7 +439,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
- Assert.assertEquals(ex.getMessage(), "Algorithm fails. Failure Type: FAILED_TO_CALCULATE");
+ Assert.assertEquals(ex.getMessage(), "Failed to calculate for the new best possible. Failure Type: FAILED_TO_CALCULATE");
}
// But if call with the public method computeNewIdealStates(), the rebalance will return with
// the previous rebalance result.