You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/15 19:02:07 UTC

[helix] 01/03: Add Abnormal States Resolver interface and configuration item. (#1028)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 7ad551b9fb64aef70f485c658c09a3145baa8bbf
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu May 28 15:55:33 2020 -0700

    Add Abnormal States Resolver interface and configuration item. (#1028)
    
    The Abnormal States Resolver defines a generic interface to find and recover if the partition has any abnormal current states. For example,
    - double masters
    - application data out of sync
    The interface shall be implemented according to the requirement.
    
    The resolver is applied in the rebalance process according to the corresponding cluster config item. For example,
    "ABNORMAL_STATES_RESOLVER_MAP" : {
     "MASTERSLAVE" : "org.apache.helix.api.rebalancer.constraint.MasterSlaveAbnormalStateReslovler"
    }
    The default behavior without any configuration is not doing any recovery work.
---
 .../constraint/AbnormalStateResolver.java          | 75 ++++++++++++++++++
 .../dataproviders/BaseControllerDataProvider.java  | 45 ++++++++++-
 .../controller/rebalancer/AbstractRebalancer.java  | 89 +++++++++++++++++-----
 .../rebalancer/DelayedAutoRebalancer.java          | 40 +++-------
 .../java/org/apache/helix/model/ClusterConfig.java | 28 ++++++-
 .../rebalancer/TestAbstractRebalancer.java         |  4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |  6 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       | 10 ++-
 .../constraint/MockAbnormalStateResolver.java      | 48 ++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  4 +
 .../rebalancer/TestAbnormalStatesResolver.java     | 67 ++++++++++++++++
 .../org/apache/helix/model/TestClusterConfig.java  | 16 ++++
 12 files changed, 376 insertions(+), 56 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
new file mode 100644
index 0000000..7e9946c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -0,0 +1,75 @@
+package org.apache.helix.api.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A generic interface to find and recover if the partition has abnormal current states.
+ */
+public interface AbnormalStateResolver {
+  /**
+   * A placeholder which will be used when the resolver is not specified.
+   * This is a dummy class that does not really functional.
+   */
+  AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
+    public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef) {
+      // By default, all current states are valid.
+      return true;
+    }
+    public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef, final List<String> preferenceList) {
+      throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
+    }
+  };
+
+  /**
+   * Check if the current states of the specified partition is valid.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @return true if the current states of the specified partition is valid.
+   */
+  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef);
+
+  /**
+   * Compute a transient partition state assignment to fix the abnormal.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @param preferenceList
+   * @return the transient partition state assignment which remove the abnormal states.
+   */
+  Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> preferenceList);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index a24ea46..59058ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -34,8 +34,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
@@ -53,6 +55,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +106,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
+  private final Map<String, AbnormalStateResolver> _abnormalStateResolverMap = new HashMap<>();
 
   public BaseControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER, AbstractDataCache.UNKNOWN_PIPELINE);
@@ -225,6 +229,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CLUSTER_CONFIG).getAndSet(false)) {
       _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
       refreshedType.add(HelixConstants.ChangeType.CLUSTER_CONFIG);
+      refreshAbnormalStateResolverMap(_clusterConfig);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String.format(
           "No ClusterConfig change for cluster %s, pipeline %s", _clusterName, getPipelineName()));
@@ -372,6 +377,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   public void setClusterConfig(ClusterConfig clusterConfig) {
     _clusterConfig = clusterConfig;
+    refreshAbnormalStateResolverMap(_clusterConfig);
   }
 
   @Override
@@ -731,6 +737,43 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _asyncTasksThreadPool = asyncTasksThreadPool;
   }
 
+
+  public AbnormalStateResolver getAbnormalStateResolver(String stateModel) {
+    return _abnormalStateResolverMap
+        .getOrDefault(stateModel, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+  }
+
+  private void refreshAbnormalStateResolverMap(ClusterConfig clusterConfig) {
+    if (clusterConfig == null) {
+      logger.debug("Skip refreshing abnormal state resolvers because the ClusterConfig is missing");
+      return;
+    }
+    Map<String, String> resolverMap = clusterConfig.getAbnormalStateResolverMap();
+    logger.info("Start loading the abnormal state resolvers with configuration {}", resolverMap);
+    // Remove any resolver configuration that does not exist anymore.
+    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
+    // Reload the resolver classes into cache based on the configuration.
+    for (String stateModel : resolverMap.keySet()) {
+      String resolverClassName = resolverMap.get(stateModel);
+      if (resolverClassName == null || resolverClassName.isEmpty()) {
+        // skip the empty definition.
+        continue;
+      }
+      if (!resolverClassName.equals(getAbnormalStateResolver(stateModel).getClass().getName())) {
+        try {
+          AbnormalStateResolver resolver = AbnormalStateResolver.class
+              .cast(HelixUtil.loadClass(getClass(), resolverClassName).newInstance());
+          _abnormalStateResolverMap.put(stateModel, resolver);
+        } catch (Exception e) {
+          throw new HelixException(String
+              .format("Failed to instantiate the abnormal state resolver %s for state model %s",
+                  resolverClassName, stateModel));
+        }
+      } // else, nothing to update since the same resolver class has been loaded.
+    }
+    logger.info("Finish loading the abnormal state resolvers {}", _abnormalStateResolverMap);
+  }
+
   public boolean isMaintenanceModeEnabled() {
     return _isMaintenanceModeEnabled;
   }
@@ -768,4 +811,4 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   public String toString() {
     return genCacheContentStringBuilder().toString();
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index e85a2df..6fdd0b6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -27,11 +27,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -102,7 +104,8 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
               preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
-              cache.getClusterConfig(), partition);
+              cache.getClusterConfig(), partition,
+              cache.getAbnormalStateResolver(stateModelDefName));
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -179,44 +182,97 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     return rebalanceStrategy;
   }
 
+  /**
+   * Compute best state for partition in AUTO ideal state mode.
+   * @param liveInstances
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @param idealState
+   * @param clusterConfig
+   * @param partition
+   * @param resolver
+   * @return
+   */
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
 
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition);
+    Map<String, String> currentStateMap = new HashMap<>(
+        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
+    return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
+        disabledInstancesForPartition);
+  }
 
-    if (currentStateMap == null) {
-      currentStateMap = Collections.emptyMap();
-    }
+  /**
+   * Compute if an overwritten is necessary for the partition assignment in case that the proposed
+   * assignment is not valid or empty.
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput
+   * @param idealState
+   * @param partition
+   * @param resolver
+   * @return An optional object which contains the assignment map if overwritten is necessary.
+   * Otherwise return Optional.empty().
+   */
+  protected Optional<Map<String, String>> computeStatesOverwriteForPartition(
+      final StateModelDefinition stateModelDef, final List<String> preferenceList,
+      final CurrentStateOutput currentStateOutput, IdealState idealState, final Partition partition,
+      final AbnormalStateResolver resolver) {
+    String resourceName = idealState.getResourceName();
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
 
     // (1) If the partition is removed from IS or the IS is deleted.
     // Transit to DROPPED no matter the instance is disabled or not.
     if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
+      return Optional.of(computeBestPossibleMapForDroppedResource(currentStateMap));
     }
 
     // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
     if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
+      return Optional.of(computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef));
     }
 
-    return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
-        disabledInstancesForPartition);
+    // (3) If the current states are not valid, fix the invalid part first.
+    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, partition, stateModelDef)) {
+      Map<String, String> recoveryAssignment = resolver
+          .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
+              preferenceList);
+      if (recoveryAssignment == null || !recoveryAssignment.keySet()
+          .equals(currentStateMap.keySet())) {
+        throw new HelixException(String.format(
+            "Invalid recovery assignment %s since it changed the current partition placement %s",
+            recoveryAssignment, currentStateMap));
+      }
+      return Optional.of(recoveryAssignment);
+    }
+
+    return Optional.empty();
   }
 
-  protected Map<String, String> computeBestPossibleMapForDroppedResource(Map<String, String> currentStateMap) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDroppedResource(
+      final Map<String, String> currentStateMap) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.toString());
     }
     return bestPossibleStateMap;
   }
 
-  protected Map<String, String> computeBestPossibleMapForDisabledResource(Map<String, String> currentStateMap
-      , StateModelDefinition stateModelDef) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDisabledResource(
+      final Map<String, String> currentStateMap, StateModelDefinition stateModelDef) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       if (!HelixDefinedState.ERROR.name().equals(currentStateMap.get(instance))) {
         bestPossibleStateMap.put(instance, stateModelDef.getInitialState());
@@ -267,7 +323,6 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    */
   protected Map<String, String> computeBestPossibleMap(List<String> preferenceList, StateModelDefinition stateModelDef,
       Map<String, String> currentStateMap, Set<String> liveInstances, Set<String> disabledInstancesForPartition) {
-
     Map<String, String> bestPossibleStateMap = new HashMap<>();
 
     // (1) Instances that have current state but not in preference list, drop, no matter it's disabled or not.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index f4c95a6..f169e07 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -27,10 +27,12 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -263,7 +265,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList,
               currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
-              partition);
+              partition, cache.getAbnormalStateResolver(stateModelDefName));
 
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
@@ -276,39 +278,20 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
     return partitionMapping;
   }
 
-  /**
-   * compute best state for resource in AUTO ideal state mode
-   * @param liveInstances
-   * @param stateModelDef
-   * @param preferenceList
-   * @param currentStateOutput
-   *          : instance->state for each partition
-   * @param disabledInstancesForPartition
-   * @param idealState
-   * @param  clusterConfig
-   * @param  partition
-   * @return
-   */
   @Override
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
-
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
     Map<String, String> currentStateMap = new HashMap<>(
         currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
-
-    // (1) If the partition is removed from IS or the IS is deleted.
-    // Transit to DROPPED no matter the instance is disabled or not.
-    if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
-    }
-
-    // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
-    if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
-    }
-
     // Instances not in preference list but still have active replica, retain to avoid zero replica during movement
     List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
     Collections.sort(currentInstances);
@@ -332,7 +315,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       }
     }
 
-
     // Sort the instancesToMove by their current partition state.
     // Reason: because the states are assigned to instances in the order appeared in preferenceList, if we have
     // [node1:Slave, node2:Master], we want to keep it that way, instead of assigning Master to node1.
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 165919c..bb0f728 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -30,10 +30,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * Cluster configurations
@@ -109,7 +109,13 @@ public class ClusterConfig extends HelixProperty {
     // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    /**
+     * Configure the abnormal partition states resolver classes for the corresponding state model.
+     * <State Model Def Name, Full Path of the Resolver Class Name>
+     */
+    ABNORMAL_STATES_RESOLVER_MAP
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -852,6 +858,24 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Set the abnormal state resolver class map.
+   */
+  public void setAbnormalStateResolverMap(Map<String, String> resolverMap) {
+    if (resolverMap.values().stream()
+        .anyMatch(className -> className == null || className.isEmpty())) {
+      throw new IllegalArgumentException(
+          "Invalid Abnormal State Resolver Map definition. Class name cannot be empty.");
+    }
+    _record.setMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name(), resolverMap);
+  }
+
+  public Map<String, String> getAbnormalStateResolverMap() {
+    Map<String, String> resolverMap =
+        _record.getMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name());
+    return resolverMap == null ? Collections.EMPTY_MAP : resolverMap;
+  }
+
+  /**
    * Get IdealState rules defined in the cluster config.
    * @return
    */
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 0886768..72bb726 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -52,7 +53,8 @@ public class TestAbstractRebalancer {
         .computeBestPossibleStateForPartition(new HashSet<>(liveInstances),
             BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
             preferenceList, currentStateOutput, new HashSet<>(disabledInstancesForPartition),
-            new IdealState("test"), new ClusterConfig("TestCluster"), partition);
+            new IdealState("test"), new ClusterConfig("TestCluster"), partition,
+            AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 0b1370e..0d09079 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Sets;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.MockAccessor;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -52,6 +52,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -243,7 +244,8 @@ public class TestAutoRebalanceStrategy {
         }
         Map<String, String> assignment = new AutoRebalancer()
             .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
-                preferenceList, currentStateOutput, disabled, is, clusterConfig, p);
+                preferenceList, currentStateOutput, disabled, is, clusterConfig, p,
+                AbnormalStateResolver.DUMMY_STATE_RESOLVER);
         mapResult.put(partition, assignment);
       }
       return mapResult;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 9a5e085..33885ca 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -32,7 +32,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -41,6 +41,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.testng.Assert;
@@ -85,9 +86,10 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
         }
       }
     }
-    Map<String, String> bestPossibleMap = rebalancer.computeBestPossibleStateForPartition(
-        liveInstances, stateModelDef, instancePreferenceList, currentStateOutput,
-        Collections.emptySet(), is, new ClusterConfig("TestCluster"), partition);
+    Map<String, String> bestPossibleMap = rebalancer
+        .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList,
+            currentStateOutput, Collections.emptySet(), is, new ClusterConfig("TestCluster"),
+            partition, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
         "Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap
             + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
new file mode 100644
index 0000000..4718921
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -0,0 +1,48 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A mock abnormal state resolver for supporting tests.
+ * It always return dummy result.
+ */
+public class MockAbnormalStateResolver implements AbnormalStateResolver {
+  @Override
+  public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef) {
+    // By default, all current states are valid.
+    return true;
+  }
+
+  public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> preferenceList) {
+    throw new UnsupportedOperationException("The mock resolver won't recover abnormal states.");
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 7f8281b..ca8fd53 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -39,6 +40,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.mockito.Mockito;
 import org.testng.annotations.BeforeClass;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
@@ -109,6 +111,8 @@ public abstract class AbstractTestClusterModel {
         _capacityDataMap.keySet().stream().collect(Collectors.toMap(key -> key, key -> 0)));
     testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+    when(testCache.getAbnormalStateResolver(any()))
+        .thenReturn(AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     // 3. Mock the live instance node for the default instance.
     LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
new file mode 100644
index 0000000..dde2644
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -0,0 +1,67 @@
+package org.apache.helix.integration.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
+  @Test
+  public void testConfigureResolver() {
+    ResourceControllerDataProvider cache = new ResourceControllerDataProvider(CLUSTER_NAME);
+    // Verify the initial setup.
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Update the resolver configuration for MasterSlave state model.
+    ConfigAccessor configAccessor = new ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, MockAbnormalStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    cache.requireFullRefresh();
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          stateModelDefName.equals(MasterSlaveSMD.name) ?
+              MockAbnormalStateResolver.class :
+              AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8e4a016..f353293 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -256,4 +257,19 @@ public class TestClusterConfig {
         .getBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
             false), true);
   }
+
+  @Test
+  public void testAbnormalStatesResolverConfig() {
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    // Default value is empty
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);
+    // Test set
+    Map<String, String> resolverMap = ImmutableMap.of(MasterSlaveSMD.name,
+        MockAbnormalStateResolver.class.getName());
+    testConfig.setAbnormalStateResolverMap(resolverMap);
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), resolverMap);
+    // Test empty the map
+    testConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);
+  }
 }