You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/15 23:41:19 UTC

[helix] branch master updated (b9af362 -> f85cbd1)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git.


    from b9af362  Move serializers to zookeeper-api (#1085)
     new dc75f22  Add Abnormal States Resolver interface and configuration item. (#1028)
     new 0c00596  Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)
     new f85cbd1  Add monitor to record the abnormal states processing. (#1059)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../constraint/AbnormalStateResolver.java          |  57 ++++++
 .../dataproviders/BaseControllerDataProvider.java  |  68 +++++++-
 .../controller/rebalancer/AbstractRebalancer.java  |  92 ++++++++--
 .../rebalancer/DelayedAutoRebalancer.java          |  40 ++---
 .../constraint/ExcessiveTopStateResolver.java      | 123 +++++++++++++
 .../constraint/MonitoredAbnormalResolver.java      | 117 +++++++++++++
 .../java/org/apache/helix/model/ClusterConfig.java |  28 ++-
 .../metrics/AbnormalStatesMetricCollector.java     |  67 +++++++
 .../rebalancer/TestAbstractRebalancer.java         |   4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |   6 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       |  10 +-
 .../constraint/MockAbnormalStateResolver.java      |  48 ++++++
 .../TestAbnormalStatesResolverMonitor.java         |  88 ++++++++++
 .../waged/model/AbstractTestClusterModel.java      |   4 +
 .../rebalancer/TestAbnormalStatesResolver.java     | 192 +++++++++++++++++++++
 .../org/apache/helix/model/TestClusterConfig.java  |  16 ++
 16 files changed, 903 insertions(+), 57 deletions(-)
 create mode 100644 helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/MonitoredAbnormalResolver.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/monitoring/metrics/AbnormalStatesMetricCollector.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/TestAbnormalStatesResolverMonitor.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java


[helix] 02/03: Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 0c00596832f320c6ed3f8d77d898d47bef7b68b4
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Jun 4 11:50:23 2020 -0700

    Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)
    
    Although the rebalancer will fix the additional master eventually, the default operations are arbitrary and it may cause an older master to survive. This may cause serious application logic issues since many applications require the master to have the latest data.
    With this state resolver, the rebalancer will change the default behavior to reset all the master replicas so as to ensure the remaining one is the youngest one. Then the double-masters situation is gracefully resolved.
---
 .../constraint/AbnormalStateResolver.java          |   4 +-
 .../controller/rebalancer/AbstractRebalancer.java  |   2 +-
 .../constraint/ExcessiveTopStateResolver.java      | 123 +++++++++++++++++++++
 .../constraint/MockAbnormalStateResolver.java      |   2 +-
 .../rebalancer/TestAbnormalStatesResolver.java     | 117 ++++++++++++++++++++
 5 files changed, 244 insertions(+), 4 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
index 7e9946c..1a75e0b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -35,7 +35,7 @@ public interface AbnormalStateResolver {
    * This is a dummy class that does not really functional.
    */
   AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
-    public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+    public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
         final String resourceName, final Partition partition,
         final StateModelDefinition stateModelDef) {
       // By default, all current states are valid.
@@ -56,7 +56,7 @@ public interface AbnormalStateResolver {
    * @param stateModelDef
    * @return true if the current states of the specified partition is valid.
    */
-  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+  boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef);
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 6fdd0b6..a1ca6ec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -245,7 +245,7 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     }
 
     // (3) If the current states are not valid, fix the invalid part first.
-    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, partition, stateModelDef)) {
+    if (!resolver.checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
       Map<String, String> recoveryAssignment = resolver
           .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
               preferenceList);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
new file mode 100644
index 0000000..3c1e100
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The abnormal state resolver that gracefully fixes the abnormality of excessive top states for
+ * single-topstate state model. For example, two replcias of a MasterSlave partition are assigned
+ * with the Master state at the same time. This could be caused by a network partitioning or the
+ * other unexpected issues.
+ *
+ * The resolver checks for the abnormality and computes recovery assignment which triggers the
+ * rebalancer to eventually reset all the top state replias for once. After the resets, only one
+ * replica will be assigned the top state.
+ *
+ * Note that without using this resolver, the regular Helix rebalance pipeline also removes the
+ * excessive top state replicas. However, the default logic does not force resetting ALL the top
+ * state replicas. Since the multiple top states situation may break application data, the default
+ * resolution won't be enough to fix the potential problem.
+ */
+public class ExcessiveTopStateResolver implements AbnormalStateResolver {
+  private static final Logger LOG = LoggerFactory.getLogger(ExcessiveTopStateResolver.class);
+
+  /**
+   * The current states are not valid if there are more than one top state replicas for a single top
+   * state state model.
+   */
+  @Override
+  public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition, StateModelDefinition stateModelDef) {
+    if (!stateModelDef.isSingleTopStateModel()) {
+      return true;
+    }
+    // TODO: Cache top state count in the ResourceControllerDataProvider and avoid repeated counting
+    // TODO: here. It would be premature to do it now. But with more use case, we can improve the
+    // TODO: ResourceControllerDataProvider to calculate by default.
+    if (currentStateOutput.getCurrentStateMap(resourceName, partition).values().stream()
+        .filter(state -> state.equals(stateModelDef.getTopState())).count() > 1) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition, StateModelDefinition stateModelDef,
+      List<String> preferenceList) {
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
+    if (checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
+      // This method should not be triggered when the mapping is valid.
+      // Log the warning for debug purposes.
+      LOG.warn("The input current state map {} is valid, return the original current state.",
+          currentStateMap);
+      return currentStateMap;
+    }
+
+    Map<String, String> recoverMap = new HashMap<>(currentStateMap);
+    String recoveryState = stateModelDef
+        .getNextStateForTransition(stateModelDef.getTopState(), stateModelDef.getInitialState());
+
+    // 1. We have to reset the expected top state replica host if it is hosting the top state
+    // replica. Otherwise, the old master replica with the possible stale data will never be reset
+    // there.
+    if (preferenceList != null && !preferenceList.isEmpty()) {
+      String expectedTopStateHost = preferenceList.get(0);
+      if (recoverMap.get(expectedTopStateHost).equals(stateModelDef.getTopState())) {
+        recoverMap.put(expectedTopStateHost, recoveryState);
+      }
+    }
+
+    // 2. To minimize the impact of the resolution, we want to reserve one top state replica even
+    // during the recovery process.
+    boolean hasReservedTopState = false;
+    for (String instance : recoverMap.keySet()) {
+      if (recoverMap.get(instance).equals(stateModelDef.getTopState())) {
+        if (hasReservedTopState) {
+          recoverMap.put(instance, recoveryState);
+        } else {
+          hasReservedTopState = true;
+        }
+      }
+    }
+    // Here's what we expect to happen next:
+    // 1. The ideal partition assignment is changed to the proposed recovery state. Then the current
+    // rebalance pipeline proceeds. State transition messages will be sent accordingly.
+    // 2. When the next rebalance pipeline starts, the new current state may still contain
+    // abnormality if the participants have not finished state transition yet. Then the resolver
+    // continues to fix the states with the same logic.
+    // 3. Otherwise, if the new current state contains only one top state replica, then we will hand
+    // it over to the regular rebalancer logic. The rebalancer will trigger the state transition to
+    // bring the top state back in the expected allocation.
+    // And the masters with potential stale data will be all reset by then.
+    return recoverMap;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
index 4718921..00a2f68 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -33,7 +33,7 @@ import org.apache.helix.model.StateModelDefinition;
  */
 public class MockAbnormalStateResolver implements AbnormalStateResolver {
   @Override
-  public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+  public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef) {
     // By default, all current states are valid.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
index dde2644..e10645f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -19,16 +19,33 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.ExcessiveTopStateResolver;
 import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -64,4 +81,104 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
   }
+
+  @Test(dependsOnMethods = "testConfigureResolver")
+  public void testExcessiveTopStateResolver() {
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+
+    // 1. Find a partition with a MASTER replica and a SLAVE replica
+    HelixAdmin admin = new ZKHelixAdmin.Builder().setZkAddress(ZK_ADDR).build();
+    ExternalView ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    String targetPartition = ev.getPartitionSet().iterator().next();
+    Map<String, String> partitionAssignment = ev.getStateMap(targetPartition);
+    String slaveHost =
+        partitionAssignment.entrySet().stream().filter(entry -> entry.getValue().equals("SLAVE"))
+            .findAny().get().getKey();
+    long previousMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+
+    // Build SLAVE to MASTER message
+    String msgId = new UUID(123, 456).toString();
+    Message msg =
+        createMessage(Message.MessageType.STATE_TRANSITION, msgId, "SLAVE", "MASTER", TEST_DB,
+            slaveHost);
+    msg.setStateModelDef(MasterSlaveSMD.name);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(slaveHost);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(true);
+    cr.setPartition(targetPartition);
+    cr.setResource(TEST_DB);
+    cr.setClusterName(CLUSTER_NAME);
+
+    AsyncCallback callback = new AsyncCallback() {
+      @Override
+      public void onTimeOut() {
+        Assert.fail("The test state transition timeout.");
+      }
+
+      @Override
+      public void onReplyMessage(Message message) {
+        Assert.assertEquals(message.getMsgState(), Message.MessageState.READ);
+      }
+    };
+
+    // 2. Send the SLAVE to MASTER message to the SLAVE host to make abnormal partition states.
+
+    // 2.A. Without resolver, the fixing is not completely done by the default rebalancer logic.
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as expected
+    verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
+            .count(), 1);
+    // Since the resolver is not used in the auto default fix process, there is no update on the
+    // original master. So if there is any data issue, it was not fixed.
+    long currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    Assert.assertFalse(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // 2.B. with resolver configured, the fixing is complete.
+    ConfigAccessor configAccessor = new ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, ExcessiveTopStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as expected
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
+            .count(), 1);
+    // Now the resolver is used in the auto fix process, the original master has also been refreshed.
+    // The potential data issue has been fixed in this process.
+    currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    Assert.assertTrue(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+
+  private long getTopStateUpdateTime(ExternalView ev, String partition, String state) {
+    String topStateHost = ev.getStateMap(partition).entrySet().stream()
+        .filter(entry -> entry.getValue().equals(state)).findFirst().get().getKey();
+    MockParticipantManager participant = Arrays.stream(_participants)
+        .filter(instance -> instance.getInstanceName().equals(topStateHost)).findFirst().get();
+
+    HelixDataAccessor accessor = _controller.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    CurrentState currentState = accessor.getProperty(keyBuilder
+        .currentState(participant.getInstanceName(), participant.getSessionId(),
+            ev.getResourceName()));
+    return currentState.getEndTime(partition);
+  }
 }


[helix] 03/03: Add monitor to record the abnormal states processing. (#1059)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f85cbd14702ada941709d0db00ce69a78bdf52e3
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Mon Jun 15 11:53:07 2020 -0700

    Add monitor to record the abnormal states processing. (#1059)
    
    Example ObjectName of the new monitor MBean: Rebalancer:ClusterName=<clusterName>, EntityName=AbnormalStates.<StateModelDefName>
    Attributes,
    1. AbnormalStatePartitionCounter: record the total count of the partitions that have been found in abnormal status. Note that if one partition has been found to be abnormal twice, then we will record it twice in this counter as well.
    2. RecoveryAttemptCounter: record the total count of successful recovery computation that has been done by the resolver.
---
 .../constraint/AbnormalStateResolver.java          |  18 ----
 .../dataproviders/BaseControllerDataProvider.java  |  41 ++++++--
 .../controller/rebalancer/AbstractRebalancer.java  |  19 ++--
 .../rebalancer/DelayedAutoRebalancer.java          |   6 +-
 .../constraint/MonitoredAbnormalResolver.java      | 117 +++++++++++++++++++++
 .../metrics/AbnormalStatesMetricCollector.java     |  67 ++++++++++++
 .../rebalancer/TestAbstractRebalancer.java         |   4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |   4 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       |   4 +-
 .../TestAbnormalStatesResolverMonitor.java         |  88 ++++++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |   4 +-
 .../rebalancer/TestAbnormalStatesResolver.java     |  54 ++++++----
 12 files changed, 356 insertions(+), 70 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
index 1a75e0b..309d98d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -31,24 +31,6 @@ import org.apache.helix.model.StateModelDefinition;
  */
 public interface AbnormalStateResolver {
   /**
-   * A placeholder which will be used when the resolver is not specified.
-   * This is a dummy class that does not really functional.
-   */
-  AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
-    public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
-        final String resourceName, final Partition partition,
-        final StateModelDefinition stateModelDef) {
-      // By default, all current states are valid.
-      return true;
-    }
-    public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
-        final String resourceName, final Partition partition,
-        final StateModelDefinition stateModelDef, final List<String> preferenceList) {
-      throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
-    }
-  };
-
-  /**
    * Check if the current states of the specified partition is valid.
    * @param currentStateOutput
    * @param resourceName
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 59058ab..e2f2ac2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -44,6 +44,7 @@ import org.apache.helix.common.caches.InstanceMessagesCache;
 import org.apache.helix.common.caches.PropertyCache;
 import org.apache.helix.common.controllers.ControlContextProvider;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.CurrentState;
@@ -106,7 +107,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
-  private final Map<String, AbnormalStateResolver> _abnormalStateResolverMap = new HashMap<>();
+  private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
 
   public BaseControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER, AbstractDataCache.UNKNOWN_PIPELINE);
@@ -391,7 +392,6 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     return _pipelineName;
   }
 
-
   @Override
   public String getClusterEventId() {
     return _clusterEventId;
@@ -737,10 +737,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _asyncTasksThreadPool = asyncTasksThreadPool;
   }
 
-
-  public AbnormalStateResolver getAbnormalStateResolver(String stateModel) {
+  public MonitoredAbnormalResolver getAbnormalStateResolver(String stateModel) {
     return _abnormalStateResolverMap
-        .getOrDefault(stateModel, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+        .getOrDefault(stateModel, MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
   }
 
   private void refreshAbnormalStateResolverMap(ClusterConfig clusterConfig) {
@@ -748,22 +747,43 @@ public class BaseControllerDataProvider implements ControlContextProvider {
       logger.debug("Skip refreshing abnormal state resolvers because the ClusterConfig is missing");
       return;
     }
+
     Map<String, String> resolverMap = clusterConfig.getAbnormalStateResolverMap();
     logger.info("Start loading the abnormal state resolvers with configuration {}", resolverMap);
-    // Remove any resolver configuration that does not exist anymore.
-    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
+    // Calculate all the resolvers to be removed.
+    Map<String, MonitoredAbnormalResolver> removingResolverWraps =
+        new HashMap<>(_abnormalStateResolverMap);
+    removingResolverWraps.keySet().removeAll(resolverMap.keySet());
+    for (MonitoredAbnormalResolver monitoredAbnormalResolver : removingResolverWraps.values()) {
+      monitoredAbnormalResolver.close();
+    }
+
     // Reload the resolver classes into cache based on the configuration.
+    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
     for (String stateModel : resolverMap.keySet()) {
       String resolverClassName = resolverMap.get(stateModel);
       if (resolverClassName == null || resolverClassName.isEmpty()) {
         // skip the empty definition.
         continue;
       }
-      if (!resolverClassName.equals(getAbnormalStateResolver(stateModel).getClass().getName())) {
+
+      MonitoredAbnormalResolver currentMonitoredResolver =
+          _abnormalStateResolverMap.get(stateModel);
+      if (currentMonitoredResolver == null || !resolverClassName
+          .equals(currentMonitoredResolver.getResolverClass().getName())) {
+
+        if (currentMonitoredResolver != null) {
+          // Clean up the existing monitored resolver.
+          // We must close the existing object first to ensure the metric being removed before the
+          // new one can be registered normally.
+          currentMonitoredResolver.close();
+        }
+
         try {
-          AbnormalStateResolver resolver = AbnormalStateResolver.class
+          AbnormalStateResolver newResolver = AbnormalStateResolver.class
               .cast(HelixUtil.loadClass(getClass(), resolverClassName).newInstance());
-          _abnormalStateResolverMap.put(stateModel, resolver);
+          _abnormalStateResolverMap.put(stateModel,
+              new MonitoredAbnormalResolver(newResolver, _clusterName, stateModel));
         } catch (Exception e) {
           throw new HelixException(String
               .format("Failed to instantiate the abnormal state resolver %s for state model %s",
@@ -771,6 +791,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
         }
       } // else, nothing to update since the same resolver class has been loaded.
     }
+
     logger.info("Finish loading the abnormal state resolvers {}", _abnormalStateResolverMap);
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index a1ca6ec..f118bcb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -33,9 +33,9 @@ import java.util.Set;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -192,17 +192,17 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    * @param idealState
    * @param clusterConfig
    * @param partition
-   * @param resolver
+   * @param monitoredResolver
    * @return
    */
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
       IdealState idealState, ClusterConfig clusterConfig, Partition partition,
-      AbnormalStateResolver resolver) {
+      MonitoredAbnormalResolver monitoredResolver) {
     Optional<Map<String, String>> optionalOverwrittenStates =
         computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
-            idealState, partition, resolver);
+            idealState, partition, monitoredResolver);
     if (optionalOverwrittenStates.isPresent()) {
       return optionalOverwrittenStates.get();
     }
@@ -221,14 +221,14 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    * @param currentStateOutput
    * @param idealState
    * @param partition
-   * @param resolver
+   * @param monitoredResolver
    * @return An optional object which contains the assignment map if overwritten is necessary.
    * Otherwise return Optional.empty().
    */
   protected Optional<Map<String, String>> computeStatesOverwriteForPartition(
       final StateModelDefinition stateModelDef, final List<String> preferenceList,
       final CurrentStateOutput currentStateOutput, IdealState idealState, final Partition partition,
-      final AbnormalStateResolver resolver) {
+      final MonitoredAbnormalResolver monitoredResolver) {
     String resourceName = idealState.getResourceName();
     Map<String, String> currentStateMap =
         currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -245,8 +245,10 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     }
 
     // (3) If the current states are not valid, fix the invalid part first.
-    if (!resolver.checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
-      Map<String, String> recoveryAssignment = resolver
+    if (!monitoredResolver
+        .checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
+      monitoredResolver.recordAbnormalState();
+      Map<String, String> recoveryAssignment = monitoredResolver
           .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
               preferenceList);
       if (recoveryAssignment == null || !recoveryAssignment.keySet()
@@ -255,6 +257,7 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
             "Invalid recovery assignment %s since it changed the current partition placement %s",
             recoveryAssignment, currentStateMap));
       }
+      monitoredResolver.recordRecoveryAttempt();
       return Optional.of(recoveryAssignment);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index f169e07..e0fe24d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -32,8 +32,8 @@ import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
@@ -283,10 +283,10 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
       IdealState idealState, ClusterConfig clusterConfig, Partition partition,
-      AbnormalStateResolver resolver) {
+      MonitoredAbnormalResolver monitoredResolver) {
     Optional<Map<String, String>> optionalOverwrittenStates =
         computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
-            idealState, partition, resolver);
+            idealState, partition, monitoredResolver);
     if (optionalOverwrittenStates.isPresent()) {
       return optionalOverwrittenStates.get();
     }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/MonitoredAbnormalResolver.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/MonitoredAbnormalResolver.java
new file mode 100644
index 0000000..85f633a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/MonitoredAbnormalResolver.java
@@ -0,0 +1,117 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.metrics.AbnormalStatesMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+
+/**
+ * A wrap class to add monitor functionality into an AbnormalStateResolver implementation.
+ */
+public class MonitoredAbnormalResolver implements AbnormalStateResolver {
+  private final AbnormalStateResolver _resolver;
+  private final AbnormalStatesMetricCollector _metricCollector;
+
+  /**
+   * A placeholder which will be used when the resolver is not specified.
+   * This is a dummy class that does not really functional.
+   */
+  public final static MonitoredAbnormalResolver DUMMY_STATE_RESOLVER =
+      new MonitoredAbnormalResolver(new AbnormalStateResolver() {
+        public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
+            final String resourceName, final Partition partition,
+            final StateModelDefinition stateModelDef) {
+          // By default, all current states are valid.
+          return true;
+        }
+
+        public Map<String, String> computeRecoveryAssignment(
+            final CurrentStateOutput currentStateOutput, final String resourceName,
+            final Partition partition, final StateModelDefinition stateModelDef,
+            final List<String> preferenceList) {
+          throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
+        }
+      }, null);
+
+  private MonitoredAbnormalResolver(AbnormalStateResolver resolver,
+      AbnormalStatesMetricCollector metricCollector) {
+    if (resolver instanceof MonitoredAbnormalResolver) {
+      throw new IllegalArgumentException(
+          "Cannot construct a MonitoredAbnormalResolver wrap object using another MonitoredAbnormalResolver object.");
+    }
+    _resolver = resolver;
+    _metricCollector = metricCollector;
+  }
+
+  public MonitoredAbnormalResolver(AbnormalStateResolver resolver, String clusterName,
+      String stateModelDef) {
+    this(resolver, new AbnormalStatesMetricCollector(clusterName, stateModelDef));
+  }
+
+  public void recordAbnormalState() {
+    _metricCollector.getMetric(
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.AbnormalStatePartitionCounter.name(),
+        CountMetric.class).increment(1);
+  }
+
+  public void recordRecoveryAttempt() {
+    _metricCollector.getMetric(
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.RecoveryAttemptCounter.name(),
+        CountMetric.class).increment(1);
+  }
+
+  public Class getResolverClass() {
+    return _resolver.getClass();
+  }
+
+  @Override
+  public boolean checkCurrentStates(CurrentStateOutput currentStateOutput, String resourceName,
+      Partition partition, StateModelDefinition stateModelDef) {
+    return _resolver
+        .checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef);
+  }
+
+  @Override
+  public Map<String, String> computeRecoveryAssignment(CurrentStateOutput currentStateOutput,
+      String resourceName, Partition partition, StateModelDefinition stateModelDef,
+      List<String> preferenceList) {
+    return _resolver
+        .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
+            preferenceList);
+  }
+
+  public void close() {
+    if (_metricCollector != null) {
+      _metricCollector.unregister();
+    }
+  }
+
+  @Override
+  public void finalize() {
+    close();
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/AbnormalStatesMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/AbnormalStatesMetricCollector.java
new file mode 100644
index 0000000..a124624
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/AbnormalStatesMetricCollector.java
@@ -0,0 +1,67 @@
+package org.apache.helix.monitoring.metrics;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.management.JMException;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.implementation.RebalanceCounter;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+
+public class AbnormalStatesMetricCollector extends MetricCollector {
+  private static final String ABNORMAL_STATES_ENTITY_NAME = "AbnormalStates";
+
+  /**
+   * This enum class contains all metric names defined for AbnormalStateResolver.
+   * Note that all enums are in camel case for readability.
+   */
+  public enum AbnormalStatesMetricNames {
+    // The counter of the partitions that contains abnormal state.
+    AbnormalStatePartitionCounter,
+    // The counter of the attempts that the resolver tries to recover the abnormal state.
+    RecoveryAttemptCounter
+  }
+
+  public AbnormalStatesMetricCollector(String clusterName, String stateModelDef) {
+    super(MonitorDomainNames.Rebalancer.name(), clusterName,
+        String.format("%s.%s", ABNORMAL_STATES_ENTITY_NAME, stateModelDef));
+    createMetrics();
+    if (clusterName != null) {
+      try {
+        register();
+      } catch (JMException e) {
+        throw new HelixException(
+            "Failed to register MBean for the " + AbnormalStatesMetricCollector.class
+                .getSimpleName(), e);
+      }
+    }
+  }
+
+  private void createMetrics() {
+    // Define all metrics
+    CountMetric abnormalStateReplicasCounter =
+        new RebalanceCounter(AbnormalStatesMetricNames.AbnormalStatePartitionCounter.name());
+    CountMetric RecoveryAttemptCounter =
+        new RebalanceCounter(AbnormalStatesMetricNames.RecoveryAttemptCounter.name());
+    addMetric(abnormalStateReplicasCounter);
+    addMetric(RecoveryAttemptCounter);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 72bb726..6ecaa30 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -54,7 +54,7 @@ public class TestAbstractRebalancer {
             BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
             preferenceList, currentStateOutput, new HashSet<>(disabledInstancesForPartition),
             new IdealState("test"), new ClusterConfig("TestCluster"), partition,
-            AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+            MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
 
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 0d09079..a4bbf52 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,8 +41,8 @@ import com.google.common.collect.Sets;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.MockAccessor;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -245,7 +245,7 @@ public class TestAutoRebalanceStrategy {
         Map<String, String> assignment = new AutoRebalancer()
             .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
                 preferenceList, currentStateOutput, disabled, is, clusterConfig, p,
-                AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+                MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
         mapResult.put(partition, assignment);
       }
       return mapResult;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 33885ca..817aa62 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -32,7 +32,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -89,7 +89,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
     Map<String, String> bestPossibleMap = rebalancer
         .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList,
             currentStateOutput, Collections.emptySet(), is, new ClusterConfig("TestCluster"),
-            partition, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+            partition, MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
         "Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap
             + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/TestAbnormalStatesResolverMonitor.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/TestAbnormalStatesResolverMonitor.java
new file mode 100644
index 0000000..98ff87a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/TestAbnormalStatesResolverMonitor.java
@@ -0,0 +1,88 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.AbnormalStatesMetricCollector;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAbnormalStatesResolverMonitor {
+  private static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer();
+  private final String CLUSTER_NAME = "TestCluster";
+
+  @Test
+  public void testMonitorResolver()
+      throws MalformedObjectNameException, AttributeNotFoundException, MBeanException,
+      ReflectionException, InstanceNotFoundException {
+    final String testResolverMonitorMbeanName = String
+        .format("%s:%s=%s, %s=%s.%s", MonitorDomainNames.Rebalancer, "ClusterName", CLUSTER_NAME,
+            "EntityName", "AbnormalStates", MasterSlaveSMD.name);
+    final ObjectName testResolverMonitorMbeanObjectName =
+        new ObjectName(testResolverMonitorMbeanName);
+
+    Assert.assertFalse(MBEAN_SERVER.isRegistered(testResolverMonitorMbeanObjectName));
+
+    // Update the resolver configuration for MasterSlave state model.
+    MonitoredAbnormalResolver monitoredAbnormalResolver =
+        new MonitoredAbnormalResolver(new MockAbnormalStateResolver(), CLUSTER_NAME,
+            MasterSlaveSMD.name);
+
+    // Validate if the MBean has been registered
+    Assert.assertTrue(MBEAN_SERVER.isRegistered(testResolverMonitorMbeanObjectName));
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.AbnormalStatePartitionCounter
+            .name()), 0L);
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.RecoveryAttemptCounter.name()), 0L);
+    // Validate if the metrics recording methods work as expected
+    Random ran = new Random(System.currentTimeMillis());
+    Long expectation = 1L + ran.nextInt(10);
+    for (int i = 0; i < expectation; i++) {
+      monitoredAbnormalResolver.recordAbnormalState();
+    }
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.AbnormalStatePartitionCounter
+            .name()), expectation);
+    expectation = 1L + ran.nextInt(10);
+    for (int i = 0; i < expectation; i++) {
+      monitoredAbnormalResolver.recordRecoveryAttempt();
+    }
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.RecoveryAttemptCounter.name()),
+        expectation);
+
+    // Reset the resolver map
+    monitoredAbnormalResolver.close();
+    // Validate if the MBean has been unregistered
+    Assert.assertFalse(MBEAN_SERVER.isRegistered(testResolverMonitorMbeanObjectName));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index ca8fd53..e09d090 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,8 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
@@ -112,7 +112,7 @@ public abstract class AbstractTestClusterModel {
     testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
     when(testCache.getAbnormalStateResolver(any()))
-        .thenReturn(AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+        .thenReturn(MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
 
     // 3. Mock the live instance node for the default instance.
     LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
index e10645f..5112491 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -32,10 +32,10 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.constraint.ExcessiveTopStateResolver;
 import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -50,14 +50,18 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
+  // TODO: remove this wait time once we have a better way to determine if the rebalance has been
+  // TODO: done as a reaction of the test operations.
+  protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1000;
+
   @Test
   public void testConfigureResolver() {
     ResourceControllerDataProvider cache = new ResourceControllerDataProvider(CLUSTER_NAME);
     // Verify the initial setup.
     cache.refresh(_controller.getHelixDataAccessor());
     for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
-      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
-          AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getResolverClass(),
+          MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER.getResolverClass());
     }
 
     // Update the resolver configuration for MasterSlave state model.
@@ -70,10 +74,10 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     cache.requireFullRefresh();
     cache.refresh(_controller.getHelixDataAccessor());
     for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
-      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getResolverClass(),
           stateModelDefName.equals(MasterSlaveSMD.name) ?
               MockAbnormalStateResolver.class :
-              AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+              MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER.getResolverClass());
     }
 
     // Reset the resolver map
@@ -83,7 +87,7 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
   }
 
   @Test(dependsOnMethods = "testConfigureResolver")
-  public void testExcessiveTopStateResolver() {
+  public void testExcessiveTopStateResolver() throws InterruptedException {
     BestPossibleExternalViewVerifier verifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
     Assert.assertTrue(verifier.verify());
@@ -93,16 +97,17 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     ExternalView ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
     String targetPartition = ev.getPartitionSet().iterator().next();
     Map<String, String> partitionAssignment = ev.getStateMap(targetPartition);
-    String slaveHost =
-        partitionAssignment.entrySet().stream().filter(entry -> entry.getValue().equals("SLAVE"))
-            .findAny().get().getKey();
-    long previousMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    String slaveHost = partitionAssignment.entrySet().stream()
+        .filter(entry -> entry.getValue().equals(MasterSlaveSMD.States.SLAVE.name())).findAny()
+        .get().getKey();
+    long previousMasterUpdateTime =
+        getTopStateUpdateTime(ev, targetPartition, MasterSlaveSMD.States.MASTER.name());
 
     // Build SLAVE to MASTER message
     String msgId = new UUID(123, 456).toString();
-    Message msg =
-        createMessage(Message.MessageType.STATE_TRANSITION, msgId, "SLAVE", "MASTER", TEST_DB,
-            slaveHost);
+    Message msg = createMessage(Message.MessageType.STATE_TRANSITION, msgId,
+        MasterSlaveSMD.States.SLAVE.name(), MasterSlaveSMD.States.MASTER.name(), TEST_DB,
+        slaveHost);
     msg.setStateModelDef(MasterSlaveSMD.name);
 
     Criteria cr = new Criteria();
@@ -130,17 +135,18 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     // 2.A. Without resolver, the fixing is not completely done by the default rebalancer logic.
     _controller.getMessagingService()
         .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     // Wait until the partition status is fixed, verify if the result is as expected
     verifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
-    Assert.assertTrue(verifier.verify());
+    Assert.assertTrue(verifier.verifyByPolling());
     ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
-    Assert.assertEquals(
-        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
-            .count(), 1);
+    Assert.assertEquals(ev.getStateMap(targetPartition).values().stream()
+        .filter(state -> state.equals(MasterSlaveSMD.States.MASTER.name())).count(), 1);
     // Since the resolver is not used in the auto default fix process, there is no update on the
     // original master. So if there is any data issue, it was not fixed.
-    long currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    long currentMasterUpdateTime =
+        getTopStateUpdateTime(ev, targetPartition, MasterSlaveSMD.States.MASTER.name());
     Assert.assertFalse(currentMasterUpdateTime > previousMasterUpdateTime);
 
     // 2.B. with resolver configured, the fixing is complete.
@@ -149,17 +155,19 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     clusterConfig.setAbnormalStateResolverMap(
         ImmutableMap.of(MasterSlaveSMD.name, ExcessiveTopStateResolver.class.getName()));
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
     _controller.getMessagingService()
         .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     // Wait until the partition status is fixed, verify if the result is as expected
-    Assert.assertTrue(verifier.verify());
+    Assert.assertTrue(verifier.verifyByPolling());
     ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
-    Assert.assertEquals(
-        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
-            .count(), 1);
+    Assert.assertEquals(ev.getStateMap(targetPartition).values().stream()
+        .filter(state -> state.equals(MasterSlaveSMD.States.MASTER.name())).count(), 1);
     // Now the resolver is used in the auto fix process, the original master has also been refreshed.
     // The potential data issue has been fixed in this process.
-    currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    currentMasterUpdateTime =
+        getTopStateUpdateTime(ev, targetPartition, MasterSlaveSMD.States.MASTER.name());
     Assert.assertTrue(currentMasterUpdateTime > previousMasterUpdateTime);
 
     // Reset the resolver map


[helix] 01/03: Add Abnormal States Resolver interface and configuration item. (#1028)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit dc75f228522c930cb40db1b0a16a69b3eb77d953
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu May 28 15:55:33 2020 -0700

    Add Abnormal States Resolver interface and configuration item. (#1028)
    
    The Abnormal States Resolver defines a generic interface to find and recover if the partition has any abnormal current states. For example,
    - double masters
    - application data out of sync
    The interface shall be implemented according to the requirement.
    
    The resolver is applied in the rebalance process according to the corresponding cluster config item. For example,
    "ABNORMAL_STATES_RESOLVER_MAP" : {
     "MASTERSLAVE" : "org.apache.helix.api.rebalancer.constraint.MasterSlaveAbnormalStateReslovler"
    }
    The default behavior without any configuration is not doing any recovery work.
---
 .../constraint/AbnormalStateResolver.java          | 75 ++++++++++++++++++
 .../dataproviders/BaseControllerDataProvider.java  | 45 ++++++++++-
 .../controller/rebalancer/AbstractRebalancer.java  | 89 +++++++++++++++++-----
 .../rebalancer/DelayedAutoRebalancer.java          | 40 +++-------
 .../java/org/apache/helix/model/ClusterConfig.java | 28 ++++++-
 .../rebalancer/TestAbstractRebalancer.java         |  4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |  6 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       | 10 ++-
 .../constraint/MockAbnormalStateResolver.java      | 48 ++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  4 +
 .../rebalancer/TestAbnormalStatesResolver.java     | 67 ++++++++++++++++
 .../org/apache/helix/model/TestClusterConfig.java  | 16 ++++
 12 files changed, 376 insertions(+), 56 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
new file mode 100644
index 0000000..7e9946c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -0,0 +1,75 @@
+package org.apache.helix.api.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A generic interface to find and recover if the partition has abnormal current states.
+ */
+public interface AbnormalStateResolver {
+  /**
+   * A placeholder which will be used when the resolver is not specified.
+   * This is a dummy class that does not really functional.
+   */
+  AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
+    public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef) {
+      // By default, all current states are valid.
+      return true;
+    }
+    public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef, final List<String> preferenceList) {
+      throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
+    }
+  };
+
+  /**
+   * Check if the current states of the specified partition is valid.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @return true if the current states of the specified partition is valid.
+   */
+  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef);
+
+  /**
+   * Compute a transient partition state assignment to fix the abnormal.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @param preferenceList
+   * @return the transient partition state assignment which remove the abnormal states.
+   */
+  Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> preferenceList);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index a24ea46..59058ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -34,8 +34,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
@@ -53,6 +55,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +106,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
+  private final Map<String, AbnormalStateResolver> _abnormalStateResolverMap = new HashMap<>();
 
   public BaseControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER, AbstractDataCache.UNKNOWN_PIPELINE);
@@ -225,6 +229,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CLUSTER_CONFIG).getAndSet(false)) {
       _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
       refreshedType.add(HelixConstants.ChangeType.CLUSTER_CONFIG);
+      refreshAbnormalStateResolverMap(_clusterConfig);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String.format(
           "No ClusterConfig change for cluster %s, pipeline %s", _clusterName, getPipelineName()));
@@ -372,6 +377,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   public void setClusterConfig(ClusterConfig clusterConfig) {
     _clusterConfig = clusterConfig;
+    refreshAbnormalStateResolverMap(_clusterConfig);
   }
 
   @Override
@@ -731,6 +737,43 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _asyncTasksThreadPool = asyncTasksThreadPool;
   }
 
+
+  public AbnormalStateResolver getAbnormalStateResolver(String stateModel) {
+    return _abnormalStateResolverMap
+        .getOrDefault(stateModel, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+  }
+
+  private void refreshAbnormalStateResolverMap(ClusterConfig clusterConfig) {
+    if (clusterConfig == null) {
+      logger.debug("Skip refreshing abnormal state resolvers because the ClusterConfig is missing");
+      return;
+    }
+    Map<String, String> resolverMap = clusterConfig.getAbnormalStateResolverMap();
+    logger.info("Start loading the abnormal state resolvers with configuration {}", resolverMap);
+    // Remove any resolver configuration that does not exist anymore.
+    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
+    // Reload the resolver classes into cache based on the configuration.
+    for (String stateModel : resolverMap.keySet()) {
+      String resolverClassName = resolverMap.get(stateModel);
+      if (resolverClassName == null || resolverClassName.isEmpty()) {
+        // skip the empty definition.
+        continue;
+      }
+      if (!resolverClassName.equals(getAbnormalStateResolver(stateModel).getClass().getName())) {
+        try {
+          AbnormalStateResolver resolver = AbnormalStateResolver.class
+              .cast(HelixUtil.loadClass(getClass(), resolverClassName).newInstance());
+          _abnormalStateResolverMap.put(stateModel, resolver);
+        } catch (Exception e) {
+          throw new HelixException(String
+              .format("Failed to instantiate the abnormal state resolver %s for state model %s",
+                  resolverClassName, stateModel));
+        }
+      } // else, nothing to update since the same resolver class has been loaded.
+    }
+    logger.info("Finish loading the abnormal state resolvers {}", _abnormalStateResolverMap);
+  }
+
   public boolean isMaintenanceModeEnabled() {
     return _isMaintenanceModeEnabled;
   }
@@ -768,4 +811,4 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   public String toString() {
     return genCacheContentStringBuilder().toString();
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index e85a2df..6fdd0b6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -27,11 +27,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -102,7 +104,8 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
               preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
-              cache.getClusterConfig(), partition);
+              cache.getClusterConfig(), partition,
+              cache.getAbnormalStateResolver(stateModelDefName));
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -179,44 +182,97 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     return rebalanceStrategy;
   }
 
+  /**
+   * Compute best state for partition in AUTO ideal state mode.
+   * @param liveInstances
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @param idealState
+   * @param clusterConfig
+   * @param partition
+   * @param resolver
+   * @return
+   */
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
 
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition);
+    Map<String, String> currentStateMap = new HashMap<>(
+        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
+    return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
+        disabledInstancesForPartition);
+  }
 
-    if (currentStateMap == null) {
-      currentStateMap = Collections.emptyMap();
-    }
+  /**
+   * Compute if an overwritten is necessary for the partition assignment in case that the proposed
+   * assignment is not valid or empty.
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput
+   * @param idealState
+   * @param partition
+   * @param resolver
+   * @return An optional object which contains the assignment map if overwritten is necessary.
+   * Otherwise return Optional.empty().
+   */
+  protected Optional<Map<String, String>> computeStatesOverwriteForPartition(
+      final StateModelDefinition stateModelDef, final List<String> preferenceList,
+      final CurrentStateOutput currentStateOutput, IdealState idealState, final Partition partition,
+      final AbnormalStateResolver resolver) {
+    String resourceName = idealState.getResourceName();
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
 
     // (1) If the partition is removed from IS or the IS is deleted.
     // Transit to DROPPED no matter the instance is disabled or not.
     if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
+      return Optional.of(computeBestPossibleMapForDroppedResource(currentStateMap));
     }
 
     // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
     if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
+      return Optional.of(computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef));
     }
 
-    return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
-        disabledInstancesForPartition);
+    // (3) If the current states are not valid, fix the invalid part first.
+    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, partition, stateModelDef)) {
+      Map<String, String> recoveryAssignment = resolver
+          .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
+              preferenceList);
+      if (recoveryAssignment == null || !recoveryAssignment.keySet()
+          .equals(currentStateMap.keySet())) {
+        throw new HelixException(String.format(
+            "Invalid recovery assignment %s since it changed the current partition placement %s",
+            recoveryAssignment, currentStateMap));
+      }
+      return Optional.of(recoveryAssignment);
+    }
+
+    return Optional.empty();
   }
 
-  protected Map<String, String> computeBestPossibleMapForDroppedResource(Map<String, String> currentStateMap) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDroppedResource(
+      final Map<String, String> currentStateMap) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.toString());
     }
     return bestPossibleStateMap;
   }
 
-  protected Map<String, String> computeBestPossibleMapForDisabledResource(Map<String, String> currentStateMap
-      , StateModelDefinition stateModelDef) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDisabledResource(
+      final Map<String, String> currentStateMap, StateModelDefinition stateModelDef) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       if (!HelixDefinedState.ERROR.name().equals(currentStateMap.get(instance))) {
         bestPossibleStateMap.put(instance, stateModelDef.getInitialState());
@@ -267,7 +323,6 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    */
   protected Map<String, String> computeBestPossibleMap(List<String> preferenceList, StateModelDefinition stateModelDef,
       Map<String, String> currentStateMap, Set<String> liveInstances, Set<String> disabledInstancesForPartition) {
-
     Map<String, String> bestPossibleStateMap = new HashMap<>();
 
     // (1) Instances that have current state but not in preference list, drop, no matter it's disabled or not.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index f4c95a6..f169e07 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -27,10 +27,12 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -263,7 +265,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList,
               currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
-              partition);
+              partition, cache.getAbnormalStateResolver(stateModelDefName));
 
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
@@ -276,39 +278,20 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
     return partitionMapping;
   }
 
-  /**
-   * compute best state for resource in AUTO ideal state mode
-   * @param liveInstances
-   * @param stateModelDef
-   * @param preferenceList
-   * @param currentStateOutput
-   *          : instance->state for each partition
-   * @param disabledInstancesForPartition
-   * @param idealState
-   * @param  clusterConfig
-   * @param  partition
-   * @return
-   */
   @Override
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
-
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
     Map<String, String> currentStateMap = new HashMap<>(
         currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
-
-    // (1) If the partition is removed from IS or the IS is deleted.
-    // Transit to DROPPED no matter the instance is disabled or not.
-    if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
-    }
-
-    // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
-    if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
-    }
-
     // Instances not in preference list but still have active replica, retain to avoid zero replica during movement
     List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
     Collections.sort(currentInstances);
@@ -332,7 +315,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       }
     }
 
-
     // Sort the instancesToMove by their current partition state.
     // Reason: because the states are assigned to instances in the order appeared in preferenceList, if we have
     // [node1:Slave, node2:Master], we want to keep it that way, instead of assigning Master to node1.
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 165919c..bb0f728 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -30,10 +30,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * Cluster configurations
@@ -109,7 +109,13 @@ public class ClusterConfig extends HelixProperty {
     // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    /**
+     * Configure the abnormal partition states resolver classes for the corresponding state model.
+     * <State Model Def Name, Full Path of the Resolver Class Name>
+     */
+    ABNORMAL_STATES_RESOLVER_MAP
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -852,6 +858,24 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Set the abnormal state resolver class map.
+   */
+  public void setAbnormalStateResolverMap(Map<String, String> resolverMap) {
+    if (resolverMap.values().stream()
+        .anyMatch(className -> className == null || className.isEmpty())) {
+      throw new IllegalArgumentException(
+          "Invalid Abnormal State Resolver Map definition. Class name cannot be empty.");
+    }
+    _record.setMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name(), resolverMap);
+  }
+
+  public Map<String, String> getAbnormalStateResolverMap() {
+    Map<String, String> resolverMap =
+        _record.getMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name());
+    return resolverMap == null ? Collections.EMPTY_MAP : resolverMap;
+  }
+
+  /**
    * Get IdealState rules defined in the cluster config.
    * @return
    */
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 0886768..72bb726 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -52,7 +53,8 @@ public class TestAbstractRebalancer {
         .computeBestPossibleStateForPartition(new HashSet<>(liveInstances),
             BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
             preferenceList, currentStateOutput, new HashSet<>(disabledInstancesForPartition),
-            new IdealState("test"), new ClusterConfig("TestCluster"), partition);
+            new IdealState("test"), new ClusterConfig("TestCluster"), partition,
+            AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 0b1370e..0d09079 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Sets;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.MockAccessor;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -52,6 +52,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -243,7 +244,8 @@ public class TestAutoRebalanceStrategy {
         }
         Map<String, String> assignment = new AutoRebalancer()
             .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
-                preferenceList, currentStateOutput, disabled, is, clusterConfig, p);
+                preferenceList, currentStateOutput, disabled, is, clusterConfig, p,
+                AbnormalStateResolver.DUMMY_STATE_RESOLVER);
         mapResult.put(partition, assignment);
       }
       return mapResult;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 9a5e085..33885ca 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -32,7 +32,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -41,6 +41,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.testng.Assert;
@@ -85,9 +86,10 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
         }
       }
     }
-    Map<String, String> bestPossibleMap = rebalancer.computeBestPossibleStateForPartition(
-        liveInstances, stateModelDef, instancePreferenceList, currentStateOutput,
-        Collections.emptySet(), is, new ClusterConfig("TestCluster"), partition);
+    Map<String, String> bestPossibleMap = rebalancer
+        .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList,
+            currentStateOutput, Collections.emptySet(), is, new ClusterConfig("TestCluster"),
+            partition, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
         "Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap
             + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
new file mode 100644
index 0000000..4718921
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -0,0 +1,48 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A mock abnormal state resolver for supporting tests.
+ * It always return dummy result.
+ */
+public class MockAbnormalStateResolver implements AbnormalStateResolver {
+  @Override
+  public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef) {
+    // By default, all current states are valid.
+    return true;
+  }
+
+  public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> preferenceList) {
+    throw new UnsupportedOperationException("The mock resolver won't recover abnormal states.");
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 7f8281b..ca8fd53 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -39,6 +40,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.mockito.Mockito;
 import org.testng.annotations.BeforeClass;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
@@ -109,6 +111,8 @@ public abstract class AbstractTestClusterModel {
         _capacityDataMap.keySet().stream().collect(Collectors.toMap(key -> key, key -> 0)));
     testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+    when(testCache.getAbnormalStateResolver(any()))
+        .thenReturn(AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     // 3. Mock the live instance node for the default instance.
     LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
new file mode 100644
index 0000000..dde2644
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -0,0 +1,67 @@
+package org.apache.helix.integration.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
+  @Test
+  public void testConfigureResolver() {
+    ResourceControllerDataProvider cache = new ResourceControllerDataProvider(CLUSTER_NAME);
+    // Verify the initial setup.
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Update the resolver configuration for MasterSlave state model.
+    ConfigAccessor configAccessor = new ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, MockAbnormalStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    cache.requireFullRefresh();
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          stateModelDefName.equals(MasterSlaveSMD.name) ?
+              MockAbnormalStateResolver.class :
+              AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8e4a016..f353293 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -256,4 +257,19 @@ public class TestClusterConfig {
         .getBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
             false), true);
   }
+
+  @Test
+  public void testAbnormalStatesResolverConfig() {
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    // Default value is empty
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);
+    // Test set
+    Map<String, String> resolverMap = ImmutableMap.of(MasterSlaveSMD.name,
+        MockAbnormalStateResolver.class.getName());
+    testConfig.setAbnormalStateResolverMap(resolverMap);
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), resolverMap);
+    // Test empty the map
+    testConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);
+  }
 }