You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/07 21:12:13 UTC
[helix] 10/37: Refine the WAGED rebalancer related interfaces for
integration (#431)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git
commit c3d8501e1fea597be9bb7fbfee914ced0e3869a9
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Wed Aug 28 22:28:10 2019 -0700
Refine the WAGED rebalancer related interfaces for integration (#431)
* Refine the WAGED rebalancer related interfaces and initial integrate with the BestPossibleStateCalStage.
- Modify the BestPossibleStateCalStage logic to plugin the WAGED rebalancer.
- Refine ClusterModel to integrate with the ClusterDataDetector implementation.
- Enabling getting the changed details for Cluster Config in the change detector. Which is required by the WAGED rebalancer.
---
.../org/apache/helix/HelixRebalanceException.java | 43 ++++++++++
.../changedetector/ResourceChangeDetector.java | 20 +++--
.../controller/rebalancer/GlobalRebalancer.java | 67 ---------------
.../rebalancer/waged/ClusterDataDetector.java | 73 ----------------
.../rebalancer/waged/ClusterDataProvider.java | 54 ------------
.../rebalancer/waged/WagedRebalancer.java | 65 ++++++++++----
.../waged/model/ClusterModelProvider.java | 25 +++---
.../stages/BestPossibleStateCalcStage.java | 98 ++++++++++++++++------
.../waged/model/TestClusterModelProvider.java | 6 +-
9 files changed, 196 insertions(+), 255 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
new file mode 100644
index 0000000..c01b173
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
@@ -0,0 +1,43 @@
+package org.apache.helix;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Exception thrown by Helix due to rebalance failures.
+ */
+public class HelixRebalanceException extends Exception {
+ enum RebalanceFailureType {
+ INVALID_CLUSTER_STATUS,
+ INVALID_REBALANCER_STATUS,
+ FAILED_TO_CALCULATE,
+ UNKNOWN_FAILURE
+ }
+
+ private final RebalanceFailureType _type;
+
+ public HelixRebalanceException(String message, RebalanceFailureType type, Throwable cause) {
+ super(String.format("%s. Failure Type: %s", message, type.name()), cause);
+ _type = type;
+ }
+
+ public RebalanceFailureType getFailureType() {
+ return _type;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index d65e609..611f4b2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -20,15 +20,17 @@ package org.apache.helix.controller.changedetector;
*/
import com.google.common.collect.Sets;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import org.apache.helix.HelixConstants;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
/**
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
@@ -37,6 +39,7 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
* WARNING: the methods of this class are not thread-safe.
*/
public class ResourceChangeDetector implements ChangeDetector {
+ private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());
private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
@@ -108,10 +111,13 @@ public class ResourceChangeDetector implements ChangeDetector {
return snapshot.getResourceConfigMap();
case LIVE_INSTANCE:
return snapshot.getLiveInstances();
+ case CONFIG:
+ return Collections.emptyMap();
default:
- throw new HelixException(String.format(
- "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
- changeType));
+ LOG.warn(
+ "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}",
+ changeType);
+ return Collections.emptyMap();
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java
deleted file mode 100644
index a3b9b32..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixManager;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
-import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Resource;
-
-import java.util.Map;
-
-public interface GlobalRebalancer<T extends BaseControllerDataProvider> {
- enum RebalanceFailureType {
- INVALID_CLUSTER_STATUS,
- INVALID_REBALANCER_STATUS,
- FAILED_TO_CALCULATE,
- UNKNOWN_FAILURE
- }
-
- class RebalanceFailureReason {
- private final static String DEFAULT_REASON_MESSAGE = "No detail";
- private final RebalanceFailureType _type;
- private final String _reason;
-
- public RebalanceFailureReason(RebalanceFailureType type) {
- this(type, DEFAULT_REASON_MESSAGE);
- }
-
- public RebalanceFailureReason(RebalanceFailureType type, String reason) {
- _type = type;
- _reason = reason;
- }
-
- public RebalanceFailureType get_type() {
- return _type;
- }
-
- public String get_reason() {
- return _reason;
- }
- }
-
- void init(HelixManager manager);
-
- Map<String, IdealState> computeNewIdealState(final CurrentStateOutput currentStateOutput,
- T clusterData, Map<String, Resource> resourceMap);
-
- RebalanceFailureReason getFailureReason();
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
deleted file mode 100644
index 0423edf..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.helix.controller.rebalancer.waged;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A placeholder before we have the Cluster Data Detector implemented.
- *
- * @param <T> The cache class that can be handled by the detector.
- */
-public class ClusterDataDetector<T extends BaseControllerDataProvider> {
- /**
- * All the cluster change type that may trigger a WAGED rebalancer re-calculation.
- */
- public enum ChangeType {
- BaselineAssignmentChange,
- InstanceConfigChange,
- ClusterConfigChange,
- ResourceConfigChange,
- ResourceIdealStatesChange,
- InstanceStateChange,
- OtherChange
- }
-
- private Map<ChangeType, Set<String>> _currentChanges =
- Collections.singletonMap(ChangeType.ClusterConfigChange, Collections.emptySet());
-
- public void updateClusterStatus(T cache) {
- }
-
- /**
- * Returns all change types detected during the ClusterDetection stage.
- */
- public Set<ChangeType> getChangeTypes() {
- return _currentChanges.keySet();
- }
-
- /**
- * Returns a set of the names of components that changed based on the given change type.
- */
- public Set<String> getChangesBasedOnType(ChangeType changeType) {
- return _currentChanges.get(changeType);
- }
-
- /**
- * Return a map of the change details <type, change details>.
- */
- public Map<ChangeType, Set<String>> getAllChanges() {
- return _currentChanges;
- }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
deleted file mode 100644
index 387666c..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.helix.controller.rebalancer.waged;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
-import org.apache.helix.model.ResourceAssignment;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A placeholder before we have the implementation.
- *
- * The data provider generates the Cluster Model based on the controller's data cache.
- */
-public class ClusterDataProvider {
-
- /**
- * @param dataProvider The controller's data cache.
- * @param activeInstances The logical active instances that will be used in the calculation. Note
- * This list can be different from the real active node list according to
- * the rebalancer logic.
- * @param clusterChanges All the cluster changes that happened after the previous rebalance.
- * @param baselineAssignment The persisted Baseline assignment.
- * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
- * previous rebalance.
- * @return The cluster model as the input for the upcoming rebalance.
- */
- protected static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
- Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
- Map<String, ResourceAssignment> baselineAssignment,
- Map<String, ResourceAssignment> bestPossibleAssignment) {
- // TODO finish the implementation.
- return null;
- }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index aa3cfee..fd740e6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -19,10 +19,13 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
-import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.GlobalRebalancer;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
@@ -36,23 +39,57 @@ import java.util.Map;
* A placeholder before we have the implementation.
* Weight-Aware Globally-Even Distribute Rebalancer.
*
- * @see <a href="Design Document">https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer</a>
+ * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
+ * Design Document
+ * </a>
*/
-public class WagedRebalancer implements GlobalRebalancer<ResourceControllerDataProvider> {
+public class WagedRebalancer {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
- @Override
- public void init(HelixManager manager) { }
+ // --------- The following fields are placeholders and need replacement. -----------//
+ // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
+ private final AssignmentMetadataStore _assignmentMetadataStore;
+ private final RebalanceAlgorithm _rebalanceAlgorithm;
+ // ------------------------------------------------------------------------------------//
- @Override
- public Map<String, IdealState> computeNewIdealState(CurrentStateOutput currentStateOutput,
- ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
- throws HelixException {
- return new HashMap<>();
+ // The cluster change detector is a stateful object. Make it static to avoid unnecessary
+ // reinitialization.
+ private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
+ new ThreadLocal<>();
+ private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
+
+ private ResourceChangeDetector getChangeDetector() {
+ if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
+ CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
+ }
+ return CHANGE_DETECTOR_THREAD_LOCAL.get();
+ }
+
+ public WagedRebalancer(HelixManager helixManager) {
+ // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
+ _assignmentMetadataStore = new AssignmentMetadataStore();
+ // TODO init the algorithm according to the requirement when integrate.
+ _rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm();
+
+ // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
+ // output.
+ // This calculator will translate the best possible assignment into an applicable state mapping
+ // based on the current states.
+ // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
+ _mappingCalculator = new DelayedAutoRebalancer();
}
- @Override
- public RebalanceFailureReason getFailureReason() {
- return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE);
+ /**
+ * Compute the new IdealStates for all the resources input. The IdealStates include both the new
+ * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
+ * @param clusterData The Cluster status data provider.
+ * @param resourceMap A map containing all the rebalancing resources.
+ * @param currentStateOutput The present Current State of the cluster.
+ * @return A map containing the computed new IdealStates.
+ */
+ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
+ Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
+ throws HelixRebalanceException {
+ return new HashMap<>();
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 9de023b..c4f7d02 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -19,9 +19,9 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -59,7 +59,7 @@ public class ClusterModelProvider {
*/
public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
Map<String, Resource> resourceMap, Set<String> activeInstances,
- Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baselineAssignment,
Map<String, ResourceAssignment> bestPossibleAssignment) {
// Generate replica objects for all the resource partitions.
@@ -108,14 +108,13 @@ public class ClusterModelProvider {
*/
private static Set<AssignableReplica> findToBeAssignedReplicas(
Map<String, Set<AssignableReplica>> replicaMap,
- Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
Map<String, ResourceAssignment> bestPossibleAssignment,
Map<String, Set<AssignableReplica>> allocatedReplicas) {
Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
- if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange)
- || clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange)
- || clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange)) {
- // If the cluster topology or baseline assignment has been modified, need to reassign all replicas
+ if (clusterChanges.containsKey(HelixConstants.ChangeType.CONFIG)
+ || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+ // If the cluster topology has been modified, need to reassign all replicas
toBeAssignedReplicas
.addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
} else {
@@ -124,11 +123,13 @@ public class ClusterModelProvider {
Set<AssignableReplica> replicas = replicaMap.get(resourceName);
// 1. if the resource config/idealstate is changed, need to reassign.
// 2. if the resource does appear in the best possible assignment, need to reassign.
- if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange,
- Collections.emptySet()).contains(resourceName) || clusterChanges
- .getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange,
- Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment
- .containsKey(resourceName)) {
+ if (clusterChanges
+ .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet())
+ .contains(resourceName)
+ || clusterChanges
+ .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
+ .contains(resourceName)
+ || !bestPossibleAssignment.containsKey(resourceName)) {
toBeAssignedReplicas.addAll(replicas);
continue; // go to check next resource
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 49a72e0..ba4da88 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -27,6 +28,7 @@ import java.util.concurrent.Callable;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -37,6 +39,7 @@ import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
@@ -56,18 +59,19 @@ import org.slf4j.LoggerFactory;
* IdealState,StateModel,LiveInstance
*/
public class BestPossibleStateCalcStage extends AbstractBaseStage {
- private static final Logger logger = LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
+ private static final Logger logger =
+ LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
- CurrentStateOutput currentStateOutput =
- event.getAttribute(AttributeName.CURRENT_STATE.name());
+ CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
final Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
final ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
- ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
+ ResourceControllerDataProvider cache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
if (currentStateOutput == null || resourceMap == null || cache == null) {
throw new StageException(
@@ -90,8 +94,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
resourceMap, stateModelDefMap);
}
} catch (Exception e) {
- LogUtil
- .logError(logger, _eventId, "Could not update cluster status metrics!", e);
+ LogUtil.logError(logger, _eventId, "Could not update cluster status metrics!", e);
}
return null;
}
@@ -100,7 +103,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
CurrentStateOutput currentStateOutput) {
- ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
+ ResourceControllerDataProvider cache =
+ event.getAttribute(AttributeName.ControllerDataProvider.name());
BestPossibleStateOutput output = new BestPossibleStateOutput();
HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
@@ -112,19 +116,50 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
boolean isValid = validateOfflineInstancesLimit(cache,
(HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
+ // 1. Rebalance with the WAGED rebalancer
+ // The rebalancer only calculates the new ideal assignment for all the resources that are
+ // configured to use the WAGED rebalancer.
+ // For the other resources, the legacy rebalancers will be triggered in the next step.
+ Map<String, IdealState> newIdealStates = new HashMap<>();
+ WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager);
+ try {
+ newIdealStates
+ .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput));
+ } catch (HelixRebalanceException ex) {
+ // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
+ // Since it calculates for all the eligible resources globally, a partial result is invalid.
+ // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
+ LogUtil.logError(logger, _eventId, String
+ .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
+ wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
+ }
+
final List<String> failureResources = new ArrayList<>();
Iterator<Resource> itr = resourceMap.values().iterator();
while (itr.hasNext()) {
Resource resource = itr.next();
boolean result = false;
- try {
- result =
- computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output);
- } catch (HelixException ex) {
- LogUtil.logError(logger, _eventId,
- "Exception when calculating best possible states for " + resource.getResourceName(),
- ex);
-
+ IdealState is = newIdealStates.get(resource.getResourceName());
+ if (is != null) {
+ // 2. Check if the WAGED rebalancer has calculated for this resource or not.
+ result = checkBestPossibleStateCalculation(is);
+ if (result) {
+ // The WAGED rebalancer calculates a valid result, record in the output
+ updateBestPossibleStateOutput(output, resource, is);
+ }
+ } else {
+ // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a
+ // legacy resource rebalancer if applicable.
+ // If this calculation fails, the resource will be reported in the failureResources list.
+ try {
+ result =
+ computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource,
+ output);
+ } catch (HelixException ex) {
+ LogUtil.logError(logger, _eventId,
+ "Exception when calculating best possible states for " + resource.getResourceName(),
+ ex);
+ }
}
if (!result) {
failureResources.add(resource.getResourceName());
@@ -185,8 +220,9 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
if (manager != null) {
if (manager.getHelixDataAccessor()
.getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) {
- manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(),
- true, errMsg, MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+ manager.getClusterManagmentTool()
+ .autoEnableMaintenanceMode(manager.getClusterName(), true, errMsg,
+ MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
LogUtil.logWarn(logger, _eventId, errMsg);
}
} else {
@@ -199,8 +235,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
return true;
}
- private boolean computeResourceBestPossibleState(ClusterEvent event, ResourceControllerDataProvider cache,
- CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) {
+ private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource,
+ IdealState computedIdealState) {
+ output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists());
+ for (Partition partition : resource.getPartitions()) {
+ Map<String, String> newStateMap =
+ computedIdealState.getInstanceStateMap(partition.getPartitionName());
+ output.setState(resource.getResourceName(), partition, newStateMap);
+ }
+ }
+
+ private boolean computeSingleResourceBestPossibleState(ClusterEvent event,
+ ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput,
+ Resource resource, BestPossibleStateOutput output) {
// for each ideal state
// read the state model def
// for each resource
@@ -229,12 +276,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Rebalancer<ResourceControllerDataProvider> rebalancer =
getRebalancer(idealState, resourceName, cache.isMaintenanceModeEnabled());
- MappingCalculator<ResourceControllerDataProvider> mappingCalculator = getMappingCalculator(rebalancer, resourceName);
+ MappingCalculator<ResourceControllerDataProvider> mappingCalculator =
+ getMappingCalculator(rebalancer, resourceName);
if (rebalancer == null || mappingCalculator == null) {
- LogUtil.logError(logger, _eventId,
- "Error computing assignment for resource " + resourceName + ". no rebalancer found. rebalancer: " + rebalancer
- + " mappingCalculator: " + mappingCalculator);
+ LogUtil.logError(logger, _eventId, "Error computing assignment for resource " + resourceName
+ + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCalculator: "
+ + mappingCalculator);
}
if (rebalancer != null && mappingCalculator != null) {
@@ -299,8 +347,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
}
- private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState, String resourceName,
- boolean isMaintenanceModeEnabled) {
+ private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState,
+ String resourceName, boolean isMaintenanceModeEnabled) {
Rebalancer<ResourceControllerDataProvider> customizedRebalancer = null;
String rebalancerClassName = idealState.getRebalancerClassName();
if (rebalancerClassName != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index f92a66c..1221b6f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,8 +19,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import org.apache.helix.HelixConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
@@ -177,7 +177,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
// 5. test with best possible assignment but cluster topology is changed
clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
- _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ClusterConfigChange,
+ _instances, Collections.singletonMap(HelixConstants.ChangeType.CONFIG,
Collections.emptySet()), Collections.emptyMap(), bestPossibleAssignment);
// There should be no existing assignment since the topology change invalidates all existing assignment
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
@@ -194,7 +194,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
String changedResourceName = _resourceNames.get(0);
clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
- _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ResourceConfigChange,
+ _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
Collections.singleton(changedResourceName)), Collections.emptyMap(),
bestPossibleAssignment);
// There should be no existing assignment for all the resource except for resource2.