You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/06 04:54:28 UTC
[helix] branch master updated: Refactor WagedRebalancer and add comments (#2431)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 0e93568d6 Refactor WagedRebalancer and add comments (#2431)
0e93568d6 is described below
commit 0e93568d63d1d90b13d0e5fe43a9be5c84551025
Author: Qi (Quincy) Qu <qq...@linkedin.com>
AuthorDate: Thu Apr 6 00:54:17 2023 -0400
Refactor WagedRebalancer and add comments (#2431)
Refactor WagedRebalancer and add comments
Create standalone classes for partial and global rebalance to make the
class more modular and easier to manage.
---
.../rebalancer/util/WagedRebalanceUtil.java | 53 +++
.../rebalancer/waged/AssignmentManager.java | 106 ++++++
.../rebalancer/waged/GlobalRebalanceRunner.java | 216 ++++++++++++
.../rebalancer/waged/PartialRebalanceRunner.java | 208 ++++++++++++
.../rebalancer/waged/WagedRebalancer.java | 376 ++-------------------
.../rebalancer/waged/TestWagedRebalancer.java | 5 -
6 files changed, 610 insertions(+), 354 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
new file mode 100644
index 000000000..62a5fb515
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
@@ -0,0 +1,53 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.ResourceAssignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class WagedRebalanceUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WagedRebalanceUtil.class);
+
+ /**
+ * @param clusterModel the cluster model that contains all the cluster status for the purpose of
+ * rebalancing.
+ * @return the new optimal assignment for the resources.
+ */
+ public static Map<String, ResourceAssignment> calculateAssignment(ClusterModel clusterModel,
+ RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+ long startTime = System.currentTimeMillis();
+ LOG.info("Start calculating for an assignment with algorithm {}",
+ algorithm.getClass().getSimpleName());
+ OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
+ Map<String, ResourceAssignment> newAssignment =
+ optimalAssignment.getOptimalResourceAssignment();
+ LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.",
+ algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
+ return newAssignment;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
new file mode 100644
index 000000000..475e8aad1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentManager.java
@@ -0,0 +1,106 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+
+
+/**
+ * A manager class for fetching assignment from metadata store.
+ */
+class AssignmentManager {
+ private final LatencyMetric _stateReadLatency;
+
+ public AssignmentManager(LatencyMetric stateReadLatency) {
+ _stateReadLatency = stateReadLatency;
+ }
+
+ /**
+ * @param assignmentMetadataStore
+ * @param currentStateOutput
+ * @param resources
+ * @return The current baseline assignment. If record does not exist in the
+ * assignmentMetadataStore, return the current state assignment.
+ * @throws HelixRebalanceException
+ */
+ public Map<String, ResourceAssignment> getBaselineAssignment(AssignmentMetadataStore assignmentMetadataStore,
+ CurrentStateOutput currentStateOutput, Set<String> resources) throws HelixRebalanceException {
+ Map<String, ResourceAssignment> currentBaseline = new HashMap<>();
+ if (assignmentMetadataStore != null) {
+ try {
+ _stateReadLatency.startMeasuringLatency();
+ currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline());
+ _stateReadLatency.endMeasuringLatency();
+ } catch (Exception ex) {
+ throw new HelixRebalanceException(
+ "Failed to get the current baseline assignment because of unexpected error.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ }
+ currentBaseline.keySet().retainAll(resources);
+
+ // For resources without baseline, fall back to current state assignments
+ Set<String> missingResources = new HashSet<>(resources);
+ missingResources.removeAll(currentBaseline.keySet());
+ currentBaseline.putAll(currentStateOutput.getAssignment(missingResources));
+
+ return currentBaseline;
+ }
+
+ /**
+ * @param assignmentMetadataStore
+ * @param currentStateOutput
+ * @param resources
+ * @return The current best possible assignment. If record does not exist in the
+ * assignmentMetadataStore, return the current state assignment.
+ * @throws HelixRebalanceException
+ */
+ public Map<String, ResourceAssignment> getBestPossibleAssignment(
+ AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
+ Set<String> resources) throws HelixRebalanceException {
+ Map<String, ResourceAssignment> currentBestAssignment = new HashMap<>();
+ if (assignmentMetadataStore != null) {
+ try {
+ _stateReadLatency.startMeasuringLatency();
+ currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment());
+ _stateReadLatency.endMeasuringLatency();
+ } catch (Exception ex) {
+ throw new HelixRebalanceException(
+ "Failed to get the current best possible assignment because of unexpected error.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ }
+ currentBestAssignment.keySet().retainAll(resources);
+
+ // For resources without best possible states, fall back to current state assignments
+ Set<String> missingResources = new HashSet<>(resources);
+ missingResources.removeAll(currentBestAssignment.keySet());
+ currentBestAssignment.putAll(currentStateOutput.getAssignment(missingResources));
+
+ return currentBestAssignment;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
new file mode 100644
index 000000000..6130e5c52
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java
@@ -0,0 +1,216 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.changedetector.ResourceChangeDetector;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Global Rebalance does the baseline recalculation when certain changes happen.
+ * The Global Baseline calculation does not consider any temporary status, such as participants' offline/disabled.
+ * Baseline is used as an anchor for {@link PartialRebalanceRunner}. Its computation takes previous baseline as an input.
+ * The Baseline is NOT directly propagated to the final output. It is consumed by the {link PartialRebalanceRunner}
+ * as an important parameter.
+ */
+class GlobalRebalanceRunner implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(GlobalRebalanceRunner.class);
+
+ // When any of the following change happens, the rebalancer needs to do a global rebalance which
+ // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
+ private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
+ ImmutableSet
+ .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
+ HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
+
+ // To calculate the baseline asynchronously
+ private final ExecutorService _baselineCalculateExecutor;
+ private final ResourceChangeDetector _changeDetector;
+ private final AssignmentManager _assignmentManager;
+ private final AssignmentMetadataStore _assignmentMetadataStore;
+ private final LatencyMetric _writeLatency;
+ private final CountMetric _baselineCalcCounter;
+ private final LatencyMetric _baselineCalcLatency;
+ private final CountMetric _rebalanceFailureCount;
+
+ private boolean _asyncGlobalRebalanceEnabled;
+
+ public GlobalRebalanceRunner(AssignmentManager assignmentManager,
+ AssignmentMetadataStore assignmentMetadataStore,
+ MetricCollector metricCollector,
+ LatencyMetric writeLatency,
+ CountMetric rebalanceFailureCount,
+ boolean isAsyncGlobalRebalanceEnabled) {
+ _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
+ _assignmentManager = assignmentManager;
+ _assignmentMetadataStore = assignmentMetadataStore;
+ _changeDetector = new ResourceChangeDetector(true);
+ _writeLatency = writeLatency;
+ _baselineCalcCounter = metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+ CountMetric.class);
+ _baselineCalcLatency = metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge.name(),
+ LatencyMetric.class);
+ _rebalanceFailureCount = rebalanceFailureCount;
+ _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+ }
+
+ /**
+ * Global rebalance calculates for a new baseline assignment.
+ * The new baseline assignment will be persisted and leveraged by the partial rebalance.
+ * @param clusterData
+ * @param resourceMap
+ * @param currentStateOutput
+ * @param algorithm
+ * @throws HelixRebalanceException
+ */
+ public void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
+ _changeDetector.updateSnapshots(clusterData);
+ // Get all the changed items' information. Filter for the items that have content changed.
+ final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
+
+ if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
+ final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
+ // Calculate the Baseline assignment for global rebalance.
+ Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
+ try {
+ // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
+ // be triggered again after baseline is finished.
+ // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
+ doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
+ clusterChanges);
+ } catch (HelixRebalanceException e) {
+ if (_asyncGlobalRebalanceEnabled) {
+ _rebalanceFailureCount.increment(1L);
+ }
+ LOG.error("Failed to calculate baseline assignment!", e);
+ return false;
+ }
+ return true;
+ });
+ if (waitForGlobalRebalance) {
+ try {
+ if (!result.get()) {
+ throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Calculate and update the Baseline assignment
+ * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
+ * so the new Baseline could be applied to cluster.
+ */
+ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
+ LOG.info("Start calculating the new baseline.");
+ _baselineCalcCounter.increment(1L);
+ _baselineCalcLatency.startMeasuringLatency();
+
+ // Build the cluster model for rebalance calculation.
+ // Note, for a Baseline calculation,
+ // 1. Ignore node status (disable/offline).
+ // 2. Use the previous Baseline as the only parameter about the previous assignment.
+ Map<String, ResourceAssignment> currentBaseline =
+ _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+ ClusterModel clusterModel;
+ try {
+ clusterModel =
+ ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(),
+ clusterChanges, currentBaseline);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+
+ Map<String, ResourceAssignment> newBaseline = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+ boolean isBaselineChanged =
+ _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
+ // Write the new baseline to metadata store
+ if (isBaselineChanged) {
+ try {
+ _writeLatency.startMeasuringLatency();
+ _assignmentMetadataStore.persistBaseline(newBaseline);
+ _writeLatency.endMeasuringLatency();
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ } else {
+ LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
+ }
+ _baselineCalcLatency.endMeasuringLatency();
+ LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
+
+ if (isBaselineChanged && shouldTriggerMainPipeline) {
+ LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
+ }
+ }
+
+ public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
+ _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+ }
+
+ public ResourceChangeDetector getChangeDetector() {
+ return _changeDetector;
+ }
+
+ public void resetChangeDetector() {
+ _changeDetector.resetSnapshots();
+ }
+
+ public void close() {
+ if (_baselineCalculateExecutor != null) {
+ _baselineCalculateExecutor.shutdownNow();
+ }
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
new file mode 100644
index 000000000..74982f6e3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java
@@ -0,0 +1,208 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.MetricCollector;
+import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.util.RebalanceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Compute the best possible assignment based on the Baseline and the previous Best Possible assignment.
+ * The coordinator compares the previous Best Possible assignment with the current cluster state so as to derive a
+ * minimal rebalance scope. In short, the rebalance scope only contains the following two types of partitions.
+ * 1. The partition's current assignment becomes invalid.
+ * 2. The Baseline contains some new partition assignments that do not exist in the current assignment.
+ */
+class PartialRebalanceRunner implements AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PartialRebalanceRunner.class);
+
+ private final ExecutorService _bestPossibleCalculateExecutor;
+ private final AssignmentManager _assignmentManager;
+ private final AssignmentMetadataStore _assignmentMetadataStore;
+ private final BaselineDivergenceGauge _baselineDivergenceGauge;
+ private final CountMetric _rebalanceFailureCount;
+ private final CountMetric _partialRebalanceCounter;
+ private final LatencyMetric _partialRebalanceLatency;
+
+ private boolean _asyncPartialRebalanceEnabled;
+ private Future<Boolean> _asyncPartialRebalanceResult;
+
+ public PartialRebalanceRunner(AssignmentManager assignmentManager,
+ AssignmentMetadataStore assignmentMetadataStore,
+ MetricCollector metricCollector,
+ CountMetric rebalanceFailureCount,
+ boolean isAsyncPartialRebalanceEnabled) {
+ _assignmentManager = assignmentManager;
+ _assignmentMetadataStore = assignmentMetadataStore;
+ _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
+ _rebalanceFailureCount = rebalanceFailureCount;
+ _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+
+ _partialRebalanceCounter = metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+ CountMetric.class);
+ _partialRebalanceLatency = metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
+ .name(),
+ LatencyMetric.class);
+ _baselineDivergenceGauge = metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+ BaselineDivergenceGauge.class);
+ }
+
+ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
+ throws HelixRebalanceException {
+ // If partial rebalance is async and the previous result is not completed yet,
+ // do not start another partial rebalance.
+ if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
+ && !_asyncPartialRebalanceResult.isDone()) {
+ return;
+ }
+
+ _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
+ try {
+ doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
+ currentStateOutput);
+ } catch (HelixRebalanceException e) {
+ if (_asyncPartialRebalanceEnabled) {
+ _rebalanceFailureCount.increment(1L);
+ }
+ LOG.error("Failed to calculate best possible assignment!", e);
+ return false;
+ }
+ return true;
+ });
+ if (!_asyncPartialRebalanceEnabled) {
+ try {
+ if (!_asyncPartialRebalanceResult.get()) {
+ throw new HelixRebalanceException("Failed to calculate for the new best possible.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new HelixRebalanceException("Failed to execute new best possible calculation.",
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
+ }
+ }
+ }
+
+ /**
+ * Calculate and update the Best Possible assignment
+ * If the result differ from the persisted result, persist it to memory (only if the version is not stale);
+ * If persisted, trigger the pipeline so that main thread logic can run again.
+ */
+ private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
+ Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
+ LOG.info("Start calculating the new best possible assignment.");
+ _partialRebalanceCounter.increment(1L);
+ _partialRebalanceLatency.startMeasuringLatency();
+
+ int newBestPossibleAssignmentVersion = -1;
+ if (_assignmentMetadataStore != null) {
+ newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
+ } else {
+ LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
+ }
+
+ // Read the baseline from metadata store
+ Map<String, ResourceAssignment> currentBaseline =
+ _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
+
+ // Read the best possible assignment from metadata store
+ Map<String, ResourceAssignment> currentBestPossibleAssignment =
+ _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet());
+ ClusterModel clusterModel;
+ try {
+ clusterModel = ClusterModelProvider
+ .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
+ currentBaseline, currentBestPossibleAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
+ HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
+ }
+ Map<String, ResourceAssignment> newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
+
+ // Asynchronously report baseline divergence metric before persisting to metadata store,
+ // just in case if persisting fails, we still have the metric.
+ // To avoid changes of the new assignment and make it safe when being used to measure baseline
+ // divergence, use a deep copy of the new assignment.
+ Map<String, ResourceAssignment> newAssignmentCopy = new HashMap<>();
+ for (Map.Entry<String, ResourceAssignment> entry : newAssignment.entrySet()) {
+ newAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
+ }
+
+ _baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
+ currentBaseline, newAssignmentCopy);
+
+ boolean bestPossibleUpdateSuccessful = false;
+ if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
+ bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
+ newBestPossibleAssignmentVersion);
+ } else {
+ LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
+ }
+ _partialRebalanceLatency.endMeasuringLatency();
+ LOG.info("Finish calculating the new best possible assignment.");
+
+ if (bestPossibleUpdateSuccessful) {
+ LOG.info("Schedule a new rebalance after the new best possible calculation has finished.");
+ RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
+ }
+ }
+
+ public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) {
+ _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+ }
+
+ public boolean isAsyncPartialRebalanceEnabled() {
+ return _asyncPartialRebalanceEnabled;
+ }
+
+ @Override
+ public void close() {
+ if (_bestPossibleCalculateExecutor != null) {
+ _bestPossibleCalculateExecutor.shutdownNow();
+ }
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 3fffef2fb..b43ce4b68 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -28,16 +28,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -46,11 +40,11 @@ import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.StatefulRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
-import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -60,10 +54,8 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
-import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
-import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,12 +69,6 @@ import org.slf4j.LoggerFactory;
public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
- // When any of the following change happens, the rebalancer needs to do a global rebalance which
- // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
- private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
- ImmutableSet
- .of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
- HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// To identify if the preference has been configured or not.
private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
NOT_CONFIGURED_PREFERENCE = ImmutableMap
@@ -97,37 +83,25 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
.unmodifiableList(Arrays.asList(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS,
HelixRebalanceException.Type.UNKNOWN_FAILURE));
- // To calculate the baseline asynchronously
- private final ExecutorService _baselineCalculateExecutor;
- private final ExecutorService _bestPossibleCalculateExecutor;
- private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
private final MetricCollector _metricCollector;
private final CountMetric _rebalanceFailureCount;
- private final CountMetric _baselineCalcCounter;
- private final LatencyMetric _baselineCalcLatency;
private final LatencyMetric _writeLatency;
- private final CountMetric _partialRebalanceCounter;
- private final LatencyMetric _partialRebalanceLatency;
private final CountMetric _emergencyRebalanceCounter;
private final LatencyMetric _emergencyRebalanceLatency;
private final CountMetric _rebalanceOverwriteCounter;
private final LatencyMetric _rebalanceOverwriteLatency;
- private final LatencyMetric _stateReadLatency;
- private final BaselineDivergenceGauge _baselineDivergenceGauge;
-
- private boolean _asyncGlobalRebalanceEnabled;
- private boolean _asyncPartialRebalanceEnabled;
- private Future<Boolean> _asyncPartialRebalanceResult;
+ private final AssignmentManager _assignmentManager;
+ private final PartialRebalanceRunner _partialRebalanceRunner;
+ private final GlobalRebalanceRunner _globalRebalanceRunner;
// Note, the rebalance algorithm field is mutable so it should not be directly referred except for
// the public method computeNewIdealStates.
private RebalanceAlgorithm _rebalanceAlgorithm;
- private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
- NOT_CONFIGURED_PREFERENCE;
+ private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference = NOT_CONFIGURED_PREFERENCE;
private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
String clusterName) {
@@ -199,20 +173,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_rebalanceFailureCount = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
CountMetric.class);
- _baselineCalcCounter = _metricCollector.getMetric(
- WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
- CountMetric.class);
- _baselineCalcLatency = _metricCollector.getMetric(
- WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
- .name(),
- LatencyMetric.class);
- _partialRebalanceCounter = _metricCollector.getMetric(
- WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
- CountMetric.class);
- _partialRebalanceLatency = _metricCollector.getMetric(
- WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
- .name(),
- LatencyMetric.class);
_emergencyRebalanceCounter = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(), CountMetric.class);
_emergencyRebalanceLatency = _metricCollector.getMetric(
@@ -226,29 +186,24 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_writeLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
LatencyMetric.class);
- _stateReadLatency = _metricCollector.getMetric(
+ _assignmentManager = new AssignmentManager(_metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
- LatencyMetric.class);
- _baselineDivergenceGauge = _metricCollector.getMetric(
- WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
- BaselineDivergenceGauge.class);
+ LatencyMetric.class));
- _changeDetector = new ResourceChangeDetector(true);
-
- _baselineCalculateExecutor = Executors.newSingleThreadExecutor();
- _bestPossibleCalculateExecutor = Executors.newSingleThreadExecutor();
- _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
- _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+ _partialRebalanceRunner = new PartialRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector,
+ _rebalanceFailureCount, isAsyncPartialRebalanceEnabled);
+ _globalRebalanceRunner = new GlobalRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector,
+ _writeLatency, _rebalanceFailureCount, isAsyncGlobalRebalanceEnabled);
}
// Update the global rebalance mode to be asynchronous or synchronous
public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
- _asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
+ _globalRebalanceRunner.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
}
// Update the partial rebalance mode to be asynchronous or synchronous
public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) {
- _asyncPartialRebalanceEnabled = isAsyncPartialRebalanceEnabled;
+ _partialRebalanceRunner.setPartialRebalanceAsyncMode(isAsyncPartialRebalanceEnabled);
}
// Update the rebalancer preference if the new options are different from the current preference.
@@ -267,18 +222,14 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.reset();
}
- _changeDetector.resetSnapshots();
+ _globalRebalanceRunner.resetChangeDetector();
}
// TODO the rebalancer should reject any other computing request after being closed.
@Override
public void close() {
- if (_baselineCalculateExecutor != null) {
- _baselineCalculateExecutor.shutdownNow();
- }
- if (_bestPossibleCalculateExecutor != null) {
- _bestPossibleCalculateExecutor.shutdownNow();
- }
+ _partialRebalanceRunner.close();
+ _globalRebalanceRunner.close();
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
@@ -315,7 +266,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// Note that don't return an assignment based on the current state if there is no previously
// calculated result in this fallback logic.
Map<String, ResourceAssignment> assignmentRecord =
- getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(),
+ _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(),
resourceMap.keySet());
newIdealStates = convertResourceAssignment(clusterData, assignmentRecord);
}
@@ -368,7 +319,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
if (!activeNodes.equals(clusterData.getEnabledLiveInstances()) && requireRebalanceOverwrite(clusterData,
newBestPossibleAssignment)) {
applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
+ _assignmentManager.getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()), algorithm);
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
@@ -388,7 +339,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
// Perform global rebalance for a new baseline assignment
- globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
+ _globalRebalanceRunner.globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
// Perform emergency rebalance for a new best possible assignment
Map<String, ResourceAssignment> newAssignment =
emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
@@ -434,209 +385,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
return FAILURE_TYPES_TO_PROPAGATE;
}
- /**
- * Global rebalance calculates for a new baseline assignment.
- * The new baseline assignment will be persisted and leveraged by the partial rebalance.
- * @param clusterData
- * @param resourceMap
- * @param currentStateOutput
- * @param algorithm
- * @throws HelixRebalanceException
- */
- private void globalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException {
- _changeDetector.updateSnapshots(clusterData);
- // Get all the changed items' information. Filter for the items that have content changed.
- final Map<HelixConstants.ChangeType, Set<String>> clusterChanges = _changeDetector.getAllChanges();
-
- if (clusterChanges.keySet().stream().anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
- final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
- // Calculate the Baseline assignment for global rebalance.
- Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
- try {
- // If the synchronous thread does not wait for the baseline to be calculated, the synchronous thread should
- // be triggered again after baseline is finished.
- // Set shouldTriggerMainPipeline to be !waitForGlobalRebalance
- doGlobalRebalance(clusterData, resourceMap, algorithm, currentStateOutput, !waitForGlobalRebalance,
- clusterChanges);
- } catch (HelixRebalanceException e) {
- if (_asyncGlobalRebalanceEnabled) {
- _rebalanceFailureCount.increment(1L);
- }
- LOG.error("Failed to calculate baseline assignment!", e);
- return false;
- }
- return true;
- });
- if (waitForGlobalRebalance) {
- try {
- if (!result.get()) {
- throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
- HelixRebalanceException.Type.FAILED_TO_CALCULATE);
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
- HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
- }
- }
- }
- }
-
- /**
- * Calculate and update the Baseline assignment
- * @param shouldTriggerMainPipeline True if the call should trigger a following main pipeline rebalance
- * so the new Baseline could be applied to cluster.
- */
- private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput, boolean shouldTriggerMainPipeline,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges) throws HelixRebalanceException {
- LOG.info("Start calculating the new baseline.");
- _baselineCalcCounter.increment(1L);
- _baselineCalcLatency.startMeasuringLatency();
-
- // Build the cluster model for rebalance calculation.
- // Note, for a Baseline calculation,
- // 1. Ignore node status (disable/offline).
- // 2. Use the previous Baseline as the only parameter about the previous assignment.
- Map<String, ResourceAssignment> currentBaseline =
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
- ClusterModel clusterModel;
- try {
- clusterModel =
- ClusterModelProvider.generateClusterModelForBaseline(clusterData, resourceMap, clusterData.getAllInstances(),
- clusterChanges, currentBaseline);
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
- HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
- }
-
- Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
- boolean isBaselineChanged =
- _assignmentMetadataStore != null && _assignmentMetadataStore.isBaselineChanged(newBaseline);
- // Write the new baseline to metadata store
- if (isBaselineChanged) {
- try {
- _writeLatency.startMeasuringLatency();
- _assignmentMetadataStore.persistBaseline(newBaseline);
- _writeLatency.endMeasuringLatency();
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
- } else {
- LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
- }
- _baselineCalcLatency.endMeasuringLatency();
- LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
-
- if (isBaselineChanged && shouldTriggerMainPipeline) {
- LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
- RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
- }
- }
-
- private void partialRebalance(
- ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
- RebalanceAlgorithm algorithm)
- throws HelixRebalanceException {
- // If partial rebalance is async and the previous result is not completed yet,
- // do not start another partial rebalance.
- if (_asyncPartialRebalanceEnabled && _asyncPartialRebalanceResult != null
- && !_asyncPartialRebalanceResult.isDone()) {
- return;
- }
-
- _asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(() -> {
- try {
- doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
- currentStateOutput);
- } catch (HelixRebalanceException e) {
- if (_asyncPartialRebalanceEnabled) {
- _rebalanceFailureCount.increment(1L);
- }
- LOG.error("Failed to calculate best possible assignment!", e);
- return false;
- }
- return true;
- });
- if (!_asyncPartialRebalanceEnabled) {
- try {
- if (!_asyncPartialRebalanceResult.get()) {
- throw new HelixRebalanceException("Failed to calculate for the new best possible.",
- HelixRebalanceException.Type.FAILED_TO_CALCULATE);
- }
- } catch (InterruptedException | ExecutionException e) {
- throw new HelixRebalanceException("Failed to execute new best possible calculation.",
- HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
- }
- }
- }
-
- /**
- * Calculate and update the Best Possible assignment
- */
- private void doPartialRebalance(ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
- Set<String> activeNodes, RebalanceAlgorithm algorithm, CurrentStateOutput currentStateOutput)
- throws HelixRebalanceException {
- LOG.info("Start calculating the new best possible assignment.");
- _partialRebalanceCounter.increment(1L);
- _partialRebalanceLatency.startMeasuringLatency();
-
- int newBestPossibleAssignmentVersion = -1;
- if (_assignmentMetadataStore != null) {
- newBestPossibleAssignmentVersion = _assignmentMetadataStore.getBestPossibleVersion() + 1;
- } else {
- LOG.debug("Assignment Metadata Store is null. Skip getting best possible assignment version.");
- }
-
- // Read the baseline from metadata store
- Map<String, ResourceAssignment> currentBaseline =
- getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
-
- // Read the best possible assignment from metadata store
- Map<String, ResourceAssignment> currentBestPossibleAssignment =
- getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
- resourceMap.keySet());
- ClusterModel clusterModel;
- try {
- clusterModel = ClusterModelProvider
- .generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
- currentBaseline, currentBestPossibleAssignment);
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
- HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
- }
- Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterModel, algorithm);
-
- // Asynchronously report baseline divergence metric before persisting to metadata store,
- // just in case if persisting fails, we still have the metric.
- // To avoid changes of the new assignment and make it safe when being used to measure baseline
- // divergence, use a deep copy of the new assignment.
- Map<String, ResourceAssignment> newAssignmentCopy = new HashMap<>();
- for (Map.Entry<String, ResourceAssignment> entry : newAssignment.entrySet()) {
- newAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
- }
-
- _baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
- currentBaseline, newAssignmentCopy);
-
- boolean bestPossibleUpdateSuccessful = false;
- if (_assignmentMetadataStore != null && _assignmentMetadataStore.isBestPossibleChanged(newAssignment)) {
- bestPossibleUpdateSuccessful = _assignmentMetadataStore.asyncUpdateBestPossibleAssignmentCache(newAssignment,
- newBestPossibleAssignmentVersion);
- } else {
- LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
- }
- _partialRebalanceLatency.endMeasuringLatency();
- LOG.info("Finish calculating the new best possible assignment.");
-
- if (bestPossibleUpdateSuccessful) {
- LOG.info("Schedule a new rebalance after the new best possible calculation has finished.");
- RebalanceUtil.scheduleOnDemandPipeline(clusterData.getClusterName(), 0L, false);
- }
- }
-
protected Map<String, ResourceAssignment> emergencyRebalance(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
@@ -647,7 +395,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_emergencyRebalanceLatency.startMeasuringLatency();
Map<String, ResourceAssignment> currentBestPossibleAssignment =
- getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
// Step 1: Check for permanent node down
@@ -677,7 +425,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
throw new HelixRebalanceException("Failed to generate cluster model for emergency rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
- newAssignment = calculateAssignment(clusterModel, algorithm);
+ newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
} else {
newAssignment = currentBestPossibleAssignment;
}
@@ -687,9 +435,9 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
_emergencyRebalanceLatency.endMeasuringLatency();
LOG.info("Finish emergency rebalance");
- partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
- if (!_asyncPartialRebalanceEnabled) {
- newAssignment = getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ _partialRebalanceRunner.partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
+ if (!_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
+ newAssignment = _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
persistBestPossibleAssignment(newAssignment);
}
@@ -697,24 +445,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
return newAssignment;
}
- /**
- * @param clusterModel the cluster model that contains all the cluster status for the purpose of
- * rebalancing.
- * @return the new optimal assignment for the resources.
- */
- private Map<String, ResourceAssignment> calculateAssignment(ClusterModel clusterModel,
- RebalanceAlgorithm algorithm) throws HelixRebalanceException {
- long startTime = System.currentTimeMillis();
- LOG.info("Start calculating for an assignment with algorithm {}",
- algorithm.getClass().getSimpleName());
- OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
- Map<String, ResourceAssignment> newAssignment =
- optimalAssignment.getOptimalResourceAssignment();
- LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.",
- algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
- return newAssignment;
- }
-
// Generate the preference lists from the state mapping based on state priority.
private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
Map<String, Integer> statePriorityMap) {
@@ -751,39 +481,6 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
}
}
- /**
- * @param assignmentMetadataStore
- * @param currentStateOutput
- * @param resources
- * @return The current baseline assignment. If record does not exist in the
- * assignmentMetadataStore, return the current state assignment.
- * @throws HelixRebalanceException
- */
- private Map<String, ResourceAssignment> getBaselineAssignment(
- AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
- Set<String> resources) throws HelixRebalanceException {
- Map<String, ResourceAssignment> currentBaseline = new HashMap<>();
- if (assignmentMetadataStore != null) {
- try {
- _stateReadLatency.startMeasuringLatency();
- currentBaseline = new HashMap<>(assignmentMetadataStore.getBaseline());
- _stateReadLatency.endMeasuringLatency();
- } catch (Exception ex) {
- throw new HelixRebalanceException(
- "Failed to get the current baseline assignment because of unexpected error.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
- }
- currentBaseline.keySet().retainAll(resources);
-
- // For resources without baseline, fall back to current state assignments
- Set<String> missingResources = new HashSet<>(resources);
- missingResources.removeAll(currentBaseline.keySet());
- currentBaseline.putAll(currentStateOutput.getAssignment(missingResources));
-
- return currentBaseline;
- }
-
/**
* @param assignmentMetadataStore
* @param currentStateOutput
@@ -795,26 +492,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
protected Map<String, ResourceAssignment> getBestPossibleAssignment(
AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
Set<String> resources) throws HelixRebalanceException {
- Map<String, ResourceAssignment> currentBestAssignment = new HashMap<>();
- if (assignmentMetadataStore != null) {
- try {
- _stateReadLatency.startMeasuringLatency();
- currentBestAssignment = new HashMap<>(assignmentMetadataStore.getBestPossibleAssignment());
- _stateReadLatency.endMeasuringLatency();
- } catch (Exception ex) {
- throw new HelixRebalanceException(
- "Failed to get the current best possible assignment because of unexpected error.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
- }
- currentBestAssignment.keySet().retainAll(resources);
-
- // For resources without best possible states, fall back to current state assignments
- Set<String> missingResources = new HashSet<>(resources);
- missingResources.removeAll(currentBestAssignment.keySet());
- currentBestAssignment.putAll(currentStateOutput.getAssignment(missingResources));
-
- return currentBestAssignment;
+ return _assignmentManager.getBestPossibleAssignment(assignmentMetadataStore, currentStateOutput, resources);
}
private void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment)
@@ -917,7 +595,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
Map<String, IdealState> activeIdealStates =
- convertResourceAssignment(clusterData, calculateAssignment(clusterModel, algorithm));
+ convertResourceAssignment(clusterData, WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm));
for (String resourceName : idealStateMap.keySet()) {
// The new calculated ideal state before overwrite
IdealState newIdealState = idealStateMap.get(resourceName);
@@ -968,7 +646,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
}
protected ResourceChangeDetector getChangeDetector() {
- return _changeDetector;
+ return _globalRebalanceRunner.getChangeDetector();
}
@Override
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index d011c5c2e..3344fe14c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,12 +20,9 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import java.io.IOException;
-import java.sql.Array;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -35,7 +32,6 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
@@ -55,7 +51,6 @@ import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;