You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:49 UTC
[helix] 11/50: Revert "Refine the WAGED rebalancer related
interfaces for integration (#431)" (#437)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit cd85e673ae8c2f4ecbac537905c0fab144d68f4c
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Wed Aug 28 22:47:58 2019 -0700
Revert "Refine the WAGED rebalancer related interfaces for integration (#431)" (#437)
This reverts commit 08a2015c617ddd3c93525afc572081a7836f9476.
---
.../org/apache/helix/HelixRebalanceException.java | 43 ----------
.../changedetector/ResourceChangeDetector.java | 20 ++---
.../controller/rebalancer/GlobalRebalancer.java | 67 +++++++++++++++
.../rebalancer/waged/ClusterDataDetector.java | 73 ++++++++++++++++
.../rebalancer/waged/ClusterDataProvider.java | 54 ++++++++++++
.../rebalancer/waged/WagedRebalancer.java | 65 ++++----------
.../waged/model/ClusterModelProvider.java | 25 +++---
.../stages/BestPossibleStateCalcStage.java | 98 ++++++----------------
.../waged/model/TestClusterModelProvider.java | 6 +-
9 files changed, 255 insertions(+), 196 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
deleted file mode 100644
index c01b173..0000000
--- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Exception thrown by Helix due to rebalance failures.
- */
-public class HelixRebalanceException extends Exception {
- enum RebalanceFailureType {
- INVALID_CLUSTER_STATUS,
- INVALID_REBALANCER_STATUS,
- FAILED_TO_CALCULATE,
- UNKNOWN_FAILURE
- }
-
- private final RebalanceFailureType _type;
-
- public HelixRebalanceException(String message, RebalanceFailureType type, Throwable cause) {
- super(String.format("%s. Failure Type: %s", message, type.name()), cause);
- _type = type;
- }
-
- public RebalanceFailureType getFailureType() {
- return _type;
- }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index 611f4b2..d65e609 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -20,17 +20,15 @@ package org.apache.helix.controller.changedetector;
*/
import com.google.common.collect.Sets;
-import org.apache.helix.HelixConstants;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
/**
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
@@ -39,7 +37,6 @@ import java.util.Map;
* WARNING: the methods of this class are not thread-safe.
*/
public class ResourceChangeDetector implements ChangeDetector {
- private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());
private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
@@ -111,13 +108,10 @@ public class ResourceChangeDetector implements ChangeDetector {
return snapshot.getResourceConfigMap();
case LIVE_INSTANCE:
return snapshot.getLiveInstances();
- case CONFIG:
- return Collections.emptyMap();
default:
- LOG.warn(
- "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}",
- changeType);
- return Collections.emptyMap();
+ throw new HelixException(String.format(
+ "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
+ changeType));
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java
new file mode 100644
index 0000000..a3b9b32
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java
@@ -0,0 +1,67 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+
+import java.util.Map;
+
+public interface GlobalRebalancer<T extends BaseControllerDataProvider> {
+ enum RebalanceFailureType {
+ INVALID_CLUSTER_STATUS,
+ INVALID_REBALANCER_STATUS,
+ FAILED_TO_CALCULATE,
+ UNKNOWN_FAILURE
+ }
+
+ class RebalanceFailureReason {
+ private final static String DEFAULT_REASON_MESSAGE = "No detail";
+ private final RebalanceFailureType _type;
+ private final String _reason;
+
+ public RebalanceFailureReason(RebalanceFailureType type) {
+ this(type, DEFAULT_REASON_MESSAGE);
+ }
+
+ public RebalanceFailureReason(RebalanceFailureType type, String reason) {
+ _type = type;
+ _reason = reason;
+ }
+
+ public RebalanceFailureType get_type() {
+ return _type;
+ }
+
+ public String get_reason() {
+ return _reason;
+ }
+ }
+
+ void init(HelixManager manager);
+
+ Map<String, IdealState> computeNewIdealState(final CurrentStateOutput currentStateOutput,
+ T clusterData, Map<String, Resource> resourceMap);
+
+ RebalanceFailureReason getFailureReason();
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
new file mode 100644
index 0000000..0423edf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
@@ -0,0 +1,73 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A placeholder before we have the Cluster Data Detector implemented.
+ *
+ * @param <T> The cache class that can be handled by the detector.
+ */
+public class ClusterDataDetector<T extends BaseControllerDataProvider> {
+ /**
+ * All the cluster change type that may trigger a WAGED rebalancer re-calculation.
+ */
+ public enum ChangeType {
+ BaselineAssignmentChange,
+ InstanceConfigChange,
+ ClusterConfigChange,
+ ResourceConfigChange,
+ ResourceIdealStatesChange,
+ InstanceStateChange,
+ OtherChange
+ }
+
+ private Map<ChangeType, Set<String>> _currentChanges =
+ Collections.singletonMap(ChangeType.ClusterConfigChange, Collections.emptySet());
+
+ public void updateClusterStatus(T cache) {
+ }
+
+ /**
+ * Returns all change types detected during the ClusterDetection stage.
+ */
+ public Set<ChangeType> getChangeTypes() {
+ return _currentChanges.keySet();
+ }
+
+ /**
+ * Returns a set of the names of components that changed based on the given change type.
+ */
+ public Set<String> getChangesBasedOnType(ChangeType changeType) {
+ return _currentChanges.get(changeType);
+ }
+
+ /**
+ * Return a map of the change details <type, change details>.
+ */
+ public Map<ChangeType, Set<String>> getAllChanges() {
+ return _currentChanges;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
new file mode 100644
index 0000000..387666c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
@@ -0,0 +1,54 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.model.ResourceAssignment;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A placeholder before we have the implementation.
+ *
+ * The data provider generates the Cluster Model based on the controller's data cache.
+ */
+public class ClusterDataProvider {
+
+ /**
+ * @param dataProvider The controller's data cache.
+ * @param activeInstances The logical active instances that will be used in the calculation. Note
+ * This list can be different from the real active node list according to
+ * the rebalancer logic.
+ * @param clusterChanges All the cluster changes that happened after the previous rebalance.
+ * @param baselineAssignment The persisted Baseline assignment.
+ * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
+ * previous rebalance.
+ * @return The cluster model as the input for the upcoming rebalance.
+ */
+ protected static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
+ Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
+ Map<String, ResourceAssignment> baselineAssignment,
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ // TODO finish the implementation.
+ return null;
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index fd740e6..aa3cfee 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -19,13 +19,10 @@ package org.apache.helix.controller.rebalancer.waged;
* under the License.
*/
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixRebalanceException;
-import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.GlobalRebalancer;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
@@ -39,57 +36,23 @@ import java.util.Map;
* A placeholder before we have the implementation.
* Weight-Aware Globally-Even Distribute Rebalancer.
*
- * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
- * Design Document
- * </a>
+ * @see <a href="Design Document">https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer</a>
*/
-public class WagedRebalancer {
+public class WagedRebalancer implements GlobalRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
- // --------- The following fields are placeholders and need replacement. -----------//
- // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
- private final AssignmentMetadataStore _assignmentMetadataStore;
- private final RebalanceAlgorithm _rebalanceAlgorithm;
- // ------------------------------------------------------------------------------------//
+ @Override
+ public void init(HelixManager manager) { }
- // The cluster change detector is a stateful object. Make it static to avoid unnecessary
- // reinitialization.
- private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
- new ThreadLocal<>();
- private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
-
- private ResourceChangeDetector getChangeDetector() {
- if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
- CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
- }
- return CHANGE_DETECTOR_THREAD_LOCAL.get();
- }
-
- public WagedRebalancer(HelixManager helixManager) {
- // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
- _assignmentMetadataStore = new AssignmentMetadataStore();
- // TODO init the algorithm according to the requirement when integrate.
- _rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm();
-
- // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
- // output.
- // This calculator will translate the best possible assignment into an applicable state mapping
- // based on the current states.
- // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
- _mappingCalculator = new DelayedAutoRebalancer();
+ @Override
+ public Map<String, IdealState> computeNewIdealState(CurrentStateOutput currentStateOutput,
+ ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
+ throws HelixException {
+ return new HashMap<>();
}
- /**
- * Compute the new IdealStates for all the resources input. The IdealStates include both the new
- * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
- * @param clusterData The Cluster status data provider.
- * @param resourceMap A map containing all the rebalancing resources.
- * @param currentStateOutput The present Current State of the cluster.
- * @return A map containing the computed new IdealStates.
- */
- public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
- Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
- throws HelixRebalanceException {
- return new HashMap<>();
+ @Override
+ public RebalanceFailureReason getFailureReason() {
+ return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index c4f7d02..9de023b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -19,9 +19,9 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -59,7 +59,7 @@ public class ClusterModelProvider {
*/
public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
Map<String, Resource> resourceMap, Set<String> activeInstances,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
+ Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
Map<String, ResourceAssignment> baselineAssignment,
Map<String, ResourceAssignment> bestPossibleAssignment) {
// Generate replica objects for all the resource partitions.
@@ -108,13 +108,14 @@ public class ClusterModelProvider {
*/
private static Set<AssignableReplica> findToBeAssignedReplicas(
Map<String, Set<AssignableReplica>> replicaMap,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
+ Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
Map<String, ResourceAssignment> bestPossibleAssignment,
Map<String, Set<AssignableReplica>> allocatedReplicas) {
Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
- if (clusterChanges.containsKey(HelixConstants.ChangeType.CONFIG)
- || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
- // If the cluster topology has been modified, need to reassign all replicas
+ if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange)
+ || clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange)
+ || clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange)) {
+ // If the cluster topology or baseline assignment has been modified, need to reassign all replicas
toBeAssignedReplicas
.addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
} else {
@@ -123,13 +124,11 @@ public class ClusterModelProvider {
Set<AssignableReplica> replicas = replicaMap.get(resourceName);
// 1. if the resource config/idealstate is changed, need to reassign.
// 2. if the resource does appear in the best possible assignment, need to reassign.
- if (clusterChanges
- .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet())
- .contains(resourceName)
- || clusterChanges
- .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
- .contains(resourceName)
- || !bestPossibleAssignment.containsKey(resourceName)) {
+ if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange,
+ Collections.emptySet()).contains(resourceName) || clusterChanges
+ .getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange,
+ Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment
+ .containsKey(resourceName)) {
toBeAssignedReplicas.addAll(replicas);
continue; // go to check next resource
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index ba4da88..49a72e0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
*/
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -28,7 +27,6 @@ import java.util.concurrent.Callable;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -39,7 +37,6 @@ import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
@@ -59,19 +56,18 @@ import org.slf4j.LoggerFactory;
* IdealState,StateModel,LiveInstance
*/
public class BestPossibleStateCalcStage extends AbstractBaseStage {
- private static final Logger logger =
- LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
- CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+ CurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.name());
final Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
final ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
- ResourceControllerDataProvider cache =
- event.getAttribute(AttributeName.ControllerDataProvider.name());
+ ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
if (currentStateOutput == null || resourceMap == null || cache == null) {
throw new StageException(
@@ -94,7 +90,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
resourceMap, stateModelDefMap);
}
} catch (Exception e) {
- LogUtil.logError(logger, _eventId, "Could not update cluster status metrics!", e);
+ LogUtil
+ .logError(logger, _eventId, "Could not update cluster status metrics!", e);
}
return null;
}
@@ -103,8 +100,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
CurrentStateOutput currentStateOutput) {
- ResourceControllerDataProvider cache =
- event.getAttribute(AttributeName.ControllerDataProvider.name());
+ ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
BestPossibleStateOutput output = new BestPossibleStateOutput();
HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
@@ -116,50 +112,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
boolean isValid = validateOfflineInstancesLimit(cache,
(HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
- // 1. Rebalance with the WAGED rebalancer
- // The rebalancer only calculates the new ideal assignment for all the resources that are
- // configured to use the WAGED rebalancer.
- // For the other resources, the legacy rebalancers will be triggered in the next step.
- Map<String, IdealState> newIdealStates = new HashMap<>();
- WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager);
- try {
- newIdealStates
- .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput));
- } catch (HelixRebalanceException ex) {
- // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
- // Since it calculates for all the eligible resources globally, a partial result is invalid.
- // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
- LogUtil.logError(logger, _eventId, String
- .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
- wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
- }
-
final List<String> failureResources = new ArrayList<>();
Iterator<Resource> itr = resourceMap.values().iterator();
while (itr.hasNext()) {
Resource resource = itr.next();
boolean result = false;
- IdealState is = newIdealStates.get(resource.getResourceName());
- if (is != null) {
- // 2. Check if the WAGED rebalancer has calculated for this resource or not.
- result = checkBestPossibleStateCalculation(is);
- if (result) {
- // The WAGED rebalancer calculates a valid result, record in the output
- updateBestPossibleStateOutput(output, resource, is);
- }
- } else {
- // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a
- // legacy resource rebalancer if applicable.
- // If this calculation fails, the resource will be reported in the failureResources list.
- try {
- result =
- computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource,
- output);
- } catch (HelixException ex) {
- LogUtil.logError(logger, _eventId,
- "Exception when calculating best possible states for " + resource.getResourceName(),
- ex);
- }
+ try {
+ result =
+ computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output);
+ } catch (HelixException ex) {
+ LogUtil.logError(logger, _eventId,
+ "Exception when calculating best possible states for " + resource.getResourceName(),
+ ex);
+
}
if (!result) {
failureResources.add(resource.getResourceName());
@@ -220,9 +185,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
if (manager != null) {
if (manager.getHelixDataAccessor()
.getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) {
- manager.getClusterManagmentTool()
- .autoEnableMaintenanceMode(manager.getClusterName(), true, errMsg,
- MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+ manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(),
+ true, errMsg, MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
LogUtil.logWarn(logger, _eventId, errMsg);
}
} else {
@@ -235,19 +199,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
return true;
}
- private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource,
- IdealState computedIdealState) {
- output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists());
- for (Partition partition : resource.getPartitions()) {
- Map<String, String> newStateMap =
- computedIdealState.getInstanceStateMap(partition.getPartitionName());
- output.setState(resource.getResourceName(), partition, newStateMap);
- }
- }
-
- private boolean computeSingleResourceBestPossibleState(ClusterEvent event,
- ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput,
- Resource resource, BestPossibleStateOutput output) {
+ private boolean computeResourceBestPossibleState(ClusterEvent event, ResourceControllerDataProvider cache,
+ CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) {
// for each ideal state
// read the state model def
// for each resource
@@ -276,13 +229,12 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
Rebalancer<ResourceControllerDataProvider> rebalancer =
getRebalancer(idealState, resourceName, cache.isMaintenanceModeEnabled());
- MappingCalculator<ResourceControllerDataProvider> mappingCalculator =
- getMappingCalculator(rebalancer, resourceName);
+ MappingCalculator<ResourceControllerDataProvider> mappingCalculator = getMappingCalculator(rebalancer, resourceName);
if (rebalancer == null || mappingCalculator == null) {
- LogUtil.logError(logger, _eventId, "Error computing assignment for resource " + resourceName
- + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCalculator: "
- + mappingCalculator);
+ LogUtil.logError(logger, _eventId,
+ "Error computing assignment for resource " + resourceName + ". no rebalancer found. rebalancer: " + rebalancer
+ + " mappingCalculator: " + mappingCalculator);
}
if (rebalancer != null && mappingCalculator != null) {
@@ -347,8 +299,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
}
}
- private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState,
- String resourceName, boolean isMaintenanceModeEnabled) {
+ private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState, String resourceName,
+ boolean isMaintenanceModeEnabled) {
Rebalancer<ResourceControllerDataProvider> customizedRebalancer = null;
String rebalancerClassName = idealState.getRebalancerClassName();
if (rebalancerClassName != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 1221b6f..f92a66c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,8 +19,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import org.apache.helix.HelixConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
@@ -177,7 +177,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
// 5. test with best possible assignment but cluster topology is changed
clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
- _instances, Collections.singletonMap(HelixConstants.ChangeType.CONFIG,
+ _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ClusterConfigChange,
Collections.emptySet()), Collections.emptyMap(), bestPossibleAssignment);
// There should be no existing assignment since the topology change invalidates all existing assignment
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
@@ -194,7 +194,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
String changedResourceName = _resourceNames.get(0);
clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
.collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
- _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
+ _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ResourceConfigChange,
Collections.singleton(changedResourceName)), Collections.emptyMap(),
bestPossibleAssignment);
// There should be no existing assignment for all the resource except for resource2.