You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:32:49 UTC

[helix] 11/50: Revert "Refine the WAGED rebalancer related interfaces for integration (#431)" (#437)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit cd85e673ae8c2f4ecbac537905c0fab144d68f4c
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Wed Aug 28 22:47:58 2019 -0700

    Revert "Refine the WAGED rebalancer related interfaces for integration (#431)" (#437)
    
    This reverts commit 08a2015c617ddd3c93525afc572081a7836f9476.
---
 .../org/apache/helix/HelixRebalanceException.java  | 43 ----------
 .../changedetector/ResourceChangeDetector.java     | 20 ++---
 .../controller/rebalancer/GlobalRebalancer.java    | 67 +++++++++++++++
 .../rebalancer/waged/ClusterDataDetector.java      | 73 ++++++++++++++++
 .../rebalancer/waged/ClusterDataProvider.java      | 54 ++++++++++++
 .../rebalancer/waged/WagedRebalancer.java          | 65 ++++----------
 .../waged/model/ClusterModelProvider.java          | 25 +++---
 .../stages/BestPossibleStateCalcStage.java         | 98 ++++++----------------
 .../waged/model/TestClusterModelProvider.java      |  6 +-
 9 files changed, 255 insertions(+), 196 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
deleted file mode 100644
index c01b173..0000000
--- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Exception thrown by Helix due to rebalance failures.
- */
-public class HelixRebalanceException extends Exception {
-  enum RebalanceFailureType {
-    INVALID_CLUSTER_STATUS,
-    INVALID_REBALANCER_STATUS,
-    FAILED_TO_CALCULATE,
-    UNKNOWN_FAILURE
-  }
-
-  private final RebalanceFailureType _type;
-
-  public HelixRebalanceException(String message, RebalanceFailureType type, Throwable cause) {
-    super(String.format("%s. Failure Type: %s", message, type.name()), cause);
-    _type = type;
-  }
-
-  public RebalanceFailureType getFailureType() {
-    return _type;
-  }
-}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index 611f4b2..d65e609 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -20,17 +20,15 @@ package org.apache.helix.controller.changedetector;
  */
 
 import com.google.common.collect.Sets;
-import org.apache.helix.HelixConstants;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 
 /**
  * ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
@@ -39,7 +37,6 @@ import java.util.Map;
  * WARNING: the methods of this class are not thread-safe.
  */
 public class ResourceChangeDetector implements ChangeDetector {
-  private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());
 
   private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
   private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
@@ -111,13 +108,10 @@ public class ResourceChangeDetector implements ChangeDetector {
       return snapshot.getResourceConfigMap();
     case LIVE_INSTANCE:
       return snapshot.getLiveInstances();
-    case CONFIG:
-      return Collections.emptyMap();
     default:
-      LOG.warn(
-          "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}",
-          changeType);
-      return Collections.emptyMap();
+      throw new HelixException(String.format(
+          "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s",
+          changeType));
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java
new file mode 100644
index 0000000..a3b9b32
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java
@@ -0,0 +1,67 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Resource;
+
+import java.util.Map;
+
+public interface GlobalRebalancer<T extends BaseControllerDataProvider> {
+  enum RebalanceFailureType {
+    INVALID_CLUSTER_STATUS,
+    INVALID_REBALANCER_STATUS,
+    FAILED_TO_CALCULATE,
+    UNKNOWN_FAILURE
+  }
+
+  class RebalanceFailureReason {
+    private final static String DEFAULT_REASON_MESSAGE = "No detail";
+    private final RebalanceFailureType _type;
+    private final String _reason;
+
+    public RebalanceFailureReason(RebalanceFailureType type) {
+      this(type, DEFAULT_REASON_MESSAGE);
+    }
+
+    public RebalanceFailureReason(RebalanceFailureType type, String reason) {
+      _type = type;
+      _reason = reason;
+    }
+
+    public RebalanceFailureType get_type() {
+      return _type;
+    }
+
+    public String get_reason() {
+      return _reason;
+    }
+  }
+
+  void init(HelixManager manager);
+
+  Map<String, IdealState> computeNewIdealState(final CurrentStateOutput currentStateOutput,
+      T clusterData, Map<String, Resource> resourceMap);
+
+  RebalanceFailureReason getFailureReason();
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
new file mode 100644
index 0000000..0423edf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java
@@ -0,0 +1,73 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A placeholder before we have the Cluster Data Detector implemented.
+ *
+ * @param <T> The cache class that can be handled by the detector.
+ */
+public class ClusterDataDetector<T extends BaseControllerDataProvider> {
+  /**
+   * All the cluster change type that may trigger a WAGED rebalancer re-calculation.
+   */
+  public enum ChangeType {
+    BaselineAssignmentChange,
+    InstanceConfigChange,
+    ClusterConfigChange,
+    ResourceConfigChange,
+    ResourceIdealStatesChange,
+    InstanceStateChange,
+    OtherChange
+  }
+
+  private Map<ChangeType, Set<String>> _currentChanges =
+      Collections.singletonMap(ChangeType.ClusterConfigChange, Collections.emptySet());
+
+  public void updateClusterStatus(T cache) {
+  }
+
+  /**
+   * Returns all change types detected during the ClusterDetection stage.
+   */
+  public Set<ChangeType> getChangeTypes() {
+    return _currentChanges.keySet();
+  }
+
+  /**
+   * Returns a set of the names of components that changed based on the given change type.
+   */
+  public Set<String> getChangesBasedOnType(ChangeType changeType) {
+    return _currentChanges.get(changeType);
+  }
+
+  /**
+   * Return a map of the change details <type, change details>.
+   */
+  public Map<ChangeType, Set<String>> getAllChanges() {
+    return _currentChanges;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
new file mode 100644
index 0000000..387666c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java
@@ -0,0 +1,54 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.model.ResourceAssignment;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A placeholder before we have the implementation.
+ *
+ * The data provider generates the Cluster Model based on the controller's data cache.
+ */
+public class ClusterDataProvider {
+
+  /**
+   * @param dataProvider           The controller's data cache.
+   * @param activeInstances        The logical active instances that will be used in the calculation. Note
+   *                               This list can be different from the real active node list according to
+   *                               the rebalancer logic.
+   * @param clusterChanges         All the cluster changes that happened after the previous rebalance.
+   * @param baselineAssignment     The persisted Baseline assignment.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
+   *                               previous rebalance.
+   * @return The cluster model as the input for the upcoming rebalance.
+   */
+  protected static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
+      Set<String> activeInstances, Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
+      Map<String, ResourceAssignment> baselineAssignment,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // TODO finish the implementation.
+    return null;
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index fd740e6..aa3cfee 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -19,13 +19,10 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixRebalanceException;
-import org.apache.helix.controller.changedetector.ResourceChangeDetector;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm;
+import org.apache.helix.controller.rebalancer.GlobalRebalancer;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
@@ -39,57 +36,23 @@ import java.util.Map;
  * A placeholder before we have the implementation.
  * Weight-Aware Globally-Even Distribute Rebalancer.
  *
- * @see <a href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
- * Design Document
- * </a>
+ * @see <a href="Design Document">https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer</a>
  */
-public class WagedRebalancer {
+public class WagedRebalancer implements GlobalRebalancer<ResourceControllerDataProvider> {
   private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
 
-  // --------- The following fields are placeholders and need replacement. -----------//
-  // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization?
-  private final AssignmentMetadataStore _assignmentMetadataStore;
-  private final RebalanceAlgorithm _rebalanceAlgorithm;
-  // ------------------------------------------------------------------------------------//
+  @Override
+  public void init(HelixManager manager) { }
 
-  // The cluster change detector is a stateful object. Make it static to avoid unnecessary
-  // reinitialization.
-  private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
-      new ThreadLocal<>();
-  private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
-
-  private ResourceChangeDetector getChangeDetector() {
-    if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
-      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
-    }
-    return CHANGE_DETECTOR_THREAD_LOCAL.get();
-  }
-
-  public WagedRebalancer(HelixManager helixManager) {
-    // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible.
-    _assignmentMetadataStore = new AssignmentMetadataStore();
-    // TODO init the algorithm according to the requirement when integrate.
-    _rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm();
-
-    // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment
-    // output.
-    // This calculator will translate the best possible assignment into an applicable state mapping
-    // based on the current states.
-    // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer
-    _mappingCalculator = new DelayedAutoRebalancer();
+  @Override
+  public Map<String, IdealState> computeNewIdealState(CurrentStateOutput currentStateOutput,
+      ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
+      throws HelixException {
+    return new HashMap<>();
   }
 
-  /**
-   * Compute the new IdealStates for all the resources input. The IdealStates include both the new
-   * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
-   * @param clusterData        The Cluster status data provider.
-   * @param resourceMap        A map containing all the rebalancing resources.
-   * @param currentStateOutput The present Current State of the cluster.
-   * @return A map containing the computed new IdealStates.
-   */
-  public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
-      Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
-      throws HelixRebalanceException {
-    return new HashMap<>();
+  @Override
+  public RebalanceFailureReason getFailureReason() {
+    return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE);
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index c4f7d02..9de023b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -19,9 +19,9 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -59,7 +59,7 @@ public class ClusterModelProvider {
    */
   public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
       Map<String, Resource> resourceMap, Set<String> activeInstances,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
       Map<String, ResourceAssignment> baselineAssignment,
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     // Generate replica objects for all the resource partitions.
@@ -108,13 +108,14 @@ public class ClusterModelProvider {
    */
   private static Set<AssignableReplica> findToBeAssignedReplicas(
       Map<String, Set<AssignableReplica>> replicaMap,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
       Map<String, ResourceAssignment> bestPossibleAssignment,
       Map<String, Set<AssignableReplica>> allocatedReplicas) {
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
-    if (clusterChanges.containsKey(HelixConstants.ChangeType.CONFIG)
-        || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
-      // If the cluster topology has been modified, need to reassign all replicas
+    if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange)
+        || clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange)
+        || clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange)) {
+      // If the cluster topology or baseline assignment has been modified, need to reassign all replicas
       toBeAssignedReplicas
           .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
     } else {
@@ -123,13 +124,11 @@ public class ClusterModelProvider {
         Set<AssignableReplica> replicas = replicaMap.get(resourceName);
         // 1. if the resource config/idealstate is changed, need to reassign.
         // 2. if the resource does appear in the best possible assignment, need to reassign.
-        if (clusterChanges
-            .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet())
-            .contains(resourceName)
-            || clusterChanges
-            .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet())
-            .contains(resourceName)
-            || !bestPossibleAssignment.containsKey(resourceName)) {
+        if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange,
+            Collections.emptySet()).contains(resourceName) || clusterChanges
+            .getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange,
+                Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment
+            .containsKey(resourceName)) {
           toBeAssignedReplicas.addAll(replicas);
           continue; // go to check next resource
         } else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index ba4da88..49a72e0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -28,7 +27,6 @@ import java.util.concurrent.Callable;
 
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -39,7 +37,6 @@ import org.apache.helix.controller.rebalancer.MaintenanceRebalancer;
 import org.apache.helix.controller.rebalancer.Rebalancer;
 import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
-import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MaintenanceSignal;
@@ -59,19 +56,18 @@ import org.slf4j.LoggerFactory;
  * IdealState,StateModel,LiveInstance
  */
 public class BestPossibleStateCalcStage extends AbstractBaseStage {
-  private static final Logger logger =
-      LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
+  private static final Logger logger = LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName());
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
-    CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE.name());
     final Map<String, Resource> resourceMap =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
     final ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    ResourceControllerDataProvider cache =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
 
     if (currentStateOutput == null || resourceMap == null || cache == null) {
       throw new StageException(
@@ -94,7 +90,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
                     resourceMap, stateModelDefMap);
           }
         } catch (Exception e) {
-          LogUtil.logError(logger, _eventId, "Could not update cluster status metrics!", e);
+          LogUtil
+              .logError(logger, _eventId, "Could not update cluster status metrics!", e);
         }
         return null;
       }
@@ -103,8 +100,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
   private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource> resourceMap,
       CurrentStateOutput currentStateOutput) {
-    ResourceControllerDataProvider cache =
-        event.getAttribute(AttributeName.ControllerDataProvider.name());
+    ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
     BestPossibleStateOutput output = new BestPossibleStateOutput();
 
     HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
@@ -116,50 +112,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     boolean isValid = validateOfflineInstancesLimit(cache,
         (HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
 
-    // 1. Rebalance with the WAGED rebalancer
-    // The rebalancer only calculates the new ideal assignment for all the resources that are
-    // configured to use the WAGED rebalancer.
-    // For the other resources, the legacy rebalancers will be triggered in the next step.
-    Map<String, IdealState> newIdealStates = new HashMap<>();
-    WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager);
-    try {
-      newIdealStates
-          .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput));
-    } catch (HelixRebalanceException ex) {
-      // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result.
-      // Since it calculates for all the eligible resources globally, a partial result is invalid.
-      // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring.
-      LogUtil.logError(logger, _eventId, String
-          .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s",
-              wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex);
-    }
-
     final List<String> failureResources = new ArrayList<>();
     Iterator<Resource> itr = resourceMap.values().iterator();
     while (itr.hasNext()) {
       Resource resource = itr.next();
       boolean result = false;
-      IdealState is = newIdealStates.get(resource.getResourceName());
-      if (is != null) {
-        // 2. Check if the WAGED rebalancer has calculated for this resource or not.
-        result = checkBestPossibleStateCalculation(is);
-        if (result) {
-          // The WAGED rebalancer calculates a valid result, record in the output
-          updateBestPossibleStateOutput(output, resource, is);
-        }
-      } else {
-        // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a
-        // legacy resource rebalancer if applicable.
-        // If this calculation fails, the resource will be reported in the failureResources list.
-        try {
-          result =
-              computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource,
-                  output);
-        } catch (HelixException ex) {
-          LogUtil.logError(logger, _eventId,
-              "Exception when calculating best possible states for " + resource.getResourceName(),
-              ex);
-        }
+      try {
+        result =
+            computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output);
+      } catch (HelixException ex) {
+        LogUtil.logError(logger, _eventId,
+            "Exception when calculating best possible states for " + resource.getResourceName(),
+            ex);
+
       }
       if (!result) {
         failureResources.add(resource.getResourceName());
@@ -220,9 +185,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         if (manager != null) {
           if (manager.getHelixDataAccessor()
               .getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) {
-            manager.getClusterManagmentTool()
-                .autoEnableMaintenanceMode(manager.getClusterName(), true, errMsg,
-                    MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+            manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(),
+                true, errMsg, MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
             LogUtil.logWarn(logger, _eventId, errMsg);
           }
         } else {
@@ -235,19 +199,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     return true;
   }
 
-  private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource,
-      IdealState computedIdealState) {
-    output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists());
-    for (Partition partition : resource.getPartitions()) {
-      Map<String, String> newStateMap =
-          computedIdealState.getInstanceStateMap(partition.getPartitionName());
-      output.setState(resource.getResourceName(), partition, newStateMap);
-    }
-  }
-
-  private boolean computeSingleResourceBestPossibleState(ClusterEvent event,
-      ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput,
-      Resource resource, BestPossibleStateOutput output) {
+  private boolean computeResourceBestPossibleState(ClusterEvent event, ResourceControllerDataProvider cache,
+      CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) {
     // for each ideal state
     // read the state model def
     // for each resource
@@ -276,13 +229,12 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
     Rebalancer<ResourceControllerDataProvider> rebalancer =
         getRebalancer(idealState, resourceName, cache.isMaintenanceModeEnabled());
-    MappingCalculator<ResourceControllerDataProvider> mappingCalculator =
-        getMappingCalculator(rebalancer, resourceName);
+    MappingCalculator<ResourceControllerDataProvider> mappingCalculator = getMappingCalculator(rebalancer, resourceName);
 
     if (rebalancer == null || mappingCalculator == null) {
-      LogUtil.logError(logger, _eventId, "Error computing assignment for resource " + resourceName
-          + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCalculator: "
-          + mappingCalculator);
+      LogUtil.logError(logger, _eventId,
+          "Error computing assignment for resource " + resourceName + ". no rebalancer found. rebalancer: " + rebalancer
+              + " mappingCalculator: " + mappingCalculator);
     }
 
     if (rebalancer != null && mappingCalculator != null) {
@@ -347,8 +299,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     }
   }
 
-  private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState,
-      String resourceName, boolean isMaintenanceModeEnabled) {
+  private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState, String resourceName,
+      boolean isMaintenanceModeEnabled) {
     Rebalancer<ResourceControllerDataProvider> customizedRebalancer = null;
     String rebalancerClassName = idealState.getRebalancerClassName();
     if (rebalancerClassName != null) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 1221b6f..f92a66c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -19,8 +19,8 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import org.apache.helix.HelixConstants;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
@@ -177,7 +177,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     // 5. test with best possible assignment but cluster topology is changed
     clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        _instances, Collections.singletonMap(HelixConstants.ChangeType.CONFIG,
+        _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ClusterConfigChange,
             Collections.emptySet()), Collections.emptyMap(), bestPossibleAssignment);
     // There should be no existing assignment since the topology change invalidates all existing assignment
     Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
@@ -194,7 +194,7 @@ public class TestClusterModelProvider extends AbstractTestClusterModel {
     String changedResourceName = _resourceNames.get(0);
     clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream()
             .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))),
-        _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG,
+        _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ResourceConfigChange,
             Collections.singleton(changedResourceName)), Collections.emptyMap(),
         bestPossibleAssignment);
     // There should be no existing assignment for all the resource except for resource2.