You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/15 19:02:09 UTC

[helix] 03/03: Add monitor to record the abnormal states processing. (#1059)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 96b2ea3238ba071cd257a12af92b95f37ef0231c
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Mon Jun 15 11:53:07 2020 -0700

    Add monitor to record the abnormal states processing. (#1059)
    
    Example ObjectName of the new monitor MBean: Rebalancer:ClusterName=<clusterName>, EntityName=AbnormalStates.<StateModelDefName>
    Attributes,
    1. AbnormalStatePartitionCounter: record the total count of the partitions that have been found in abnormal status. Note that if one partition has been found to be abnormal twice, then we will record it twice in this counter as well.
    2. RecoveryAttemptCounter: record the total count of successful recovery computation that has been done by the resolver.
---
 .../constraint/AbnormalStateResolver.java          |  18 ----
 .../dataproviders/BaseControllerDataProvider.java  |  41 ++++++--
 .../controller/rebalancer/AbstractRebalancer.java  |  19 ++--
 .../rebalancer/DelayedAutoRebalancer.java          |   6 +-
 .../constraint/MonitoredAbnormalResolver.java      | 117 +++++++++++++++++++++
 .../metrics/AbnormalStatesMetricCollector.java     |  67 ++++++++++++
 .../rebalancer/TestAbstractRebalancer.java         |   4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |   4 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       |   4 +-
 .../TestAbnormalStatesResolverMonitor.java         |  88 ++++++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |   4 +-
 .../rebalancer/TestAbnormalStatesResolver.java     |  54 ++++++----
 12 files changed, 356 insertions(+), 70 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
index 1a75e0b..309d98d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -31,24 +31,6 @@ import org.apache.helix.model.StateModelDefinition;
  */
 public interface AbnormalStateResolver {
   /**
-   * A placeholder which will be used when the resolver is not specified.
-   * This is a dummy class that does not really functional.
-   */
-  AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
-    public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
-        final String resourceName, final Partition partition,
-        final StateModelDefinition stateModelDef) {
-      // By default, all current states are valid.
-      return true;
-    }
-    public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
-        final String resourceName, final Partition partition,
-        final StateModelDefinition stateModelDef, final List<String> preferenceList) {
-      throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
-    }
-  };
-
-  /**
    * Check if the current states of the specified partition is valid.
    * @param currentStateOutput
    * @param resourceName
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 59058ab..e2f2ac2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -44,6 +44,7 @@ import org.apache.helix.common.caches.InstanceMessagesCache;
 import org.apache.helix.common.caches.PropertyCache;
 import org.apache.helix.common.controllers.ControlContextProvider;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.CurrentState;
@@ -106,7 +107,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
-  private final Map<String, AbnormalStateResolver> _abnormalStateResolverMap = new HashMap<>();
+  private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
 
   public BaseControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER, AbstractDataCache.UNKNOWN_PIPELINE);
@@ -391,7 +392,6 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     return _pipelineName;
   }
 
-
   @Override
   public String getClusterEventId() {
     return _clusterEventId;
@@ -737,10 +737,9 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _asyncTasksThreadPool = asyncTasksThreadPool;
   }
 
-
-  public AbnormalStateResolver getAbnormalStateResolver(String stateModel) {
+  public MonitoredAbnormalResolver getAbnormalStateResolver(String stateModel) {
     return _abnormalStateResolverMap
-        .getOrDefault(stateModel, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+        .getOrDefault(stateModel, MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
   }
 
   private void refreshAbnormalStateResolverMap(ClusterConfig clusterConfig) {
@@ -748,22 +747,43 @@ public class BaseControllerDataProvider implements ControlContextProvider {
       logger.debug("Skip refreshing abnormal state resolvers because the ClusterConfig is missing");
       return;
     }
+
     Map<String, String> resolverMap = clusterConfig.getAbnormalStateResolverMap();
     logger.info("Start loading the abnormal state resolvers with configuration {}", resolverMap);
-    // Remove any resolver configuration that does not exist anymore.
-    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
+    // Calculate all the resolvers to be removed.
+    Map<String, MonitoredAbnormalResolver> removingResolverWraps =
+        new HashMap<>(_abnormalStateResolverMap);
+    removingResolverWraps.keySet().removeAll(resolverMap.keySet());
+    for (MonitoredAbnormalResolver monitoredAbnormalResolver : removingResolverWraps.values()) {
+      monitoredAbnormalResolver.close();
+    }
+
     // Reload the resolver classes into cache based on the configuration.
+    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
     for (String stateModel : resolverMap.keySet()) {
       String resolverClassName = resolverMap.get(stateModel);
       if (resolverClassName == null || resolverClassName.isEmpty()) {
         // skip the empty definition.
         continue;
       }
-      if (!resolverClassName.equals(getAbnormalStateResolver(stateModel).getClass().getName())) {
+
+      MonitoredAbnormalResolver currentMonitoredResolver =
+          _abnormalStateResolverMap.get(stateModel);
+      if (currentMonitoredResolver == null || !resolverClassName
+          .equals(currentMonitoredResolver.getResolverClass().getName())) {
+
+        if (currentMonitoredResolver != null) {
+          // Clean up the existing monitored resolver.
+          // We must close the existing object first to ensure the metric being removed before the
+          // new one can be registered normally.
+          currentMonitoredResolver.close();
+        }
+
         try {
-          AbnormalStateResolver resolver = AbnormalStateResolver.class
+          AbnormalStateResolver newResolver = AbnormalStateResolver.class
               .cast(HelixUtil.loadClass(getClass(), resolverClassName).newInstance());
-          _abnormalStateResolverMap.put(stateModel, resolver);
+          _abnormalStateResolverMap.put(stateModel,
+              new MonitoredAbnormalResolver(newResolver, _clusterName, stateModel));
         } catch (Exception e) {
           throw new HelixException(String
               .format("Failed to instantiate the abnormal state resolver %s for state model %s",
@@ -771,6 +791,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
         }
       } // else, nothing to update since the same resolver class has been loaded.
     }
+
     logger.info("Finish loading the abnormal state resolvers {}", _abnormalStateResolverMap);
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index a1ca6ec..f118bcb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -33,9 +33,9 @@ import java.util.Set;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -192,17 +192,17 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    * @param idealState
    * @param clusterConfig
    * @param partition
-   * @param resolver
+   * @param monitoredResolver
    * @return
    */
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
       IdealState idealState, ClusterConfig clusterConfig, Partition partition,
-      AbnormalStateResolver resolver) {
+      MonitoredAbnormalResolver monitoredResolver) {
     Optional<Map<String, String>> optionalOverwrittenStates =
         computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
-            idealState, partition, resolver);
+            idealState, partition, monitoredResolver);
     if (optionalOverwrittenStates.isPresent()) {
       return optionalOverwrittenStates.get();
     }
@@ -221,14 +221,14 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    * @param currentStateOutput
    * @param idealState
    * @param partition
-   * @param resolver
+   * @param monitoredResolver
    * @return An optional object which contains the assignment map if overwritten is necessary.
    * Otherwise return Optional.empty().
    */
   protected Optional<Map<String, String>> computeStatesOverwriteForPartition(
       final StateModelDefinition stateModelDef, final List<String> preferenceList,
       final CurrentStateOutput currentStateOutput, IdealState idealState, final Partition partition,
-      final AbnormalStateResolver resolver) {
+      final MonitoredAbnormalResolver monitoredResolver) {
     String resourceName = idealState.getResourceName();
     Map<String, String> currentStateMap =
         currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -245,8 +245,10 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     }
 
     // (3) If the current states are not valid, fix the invalid part first.
-    if (!resolver.checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
-      Map<String, String> recoveryAssignment = resolver
+    if (!monitoredResolver
+        .checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
+      monitoredResolver.recordAbnormalState();
+      Map<String, String> recoveryAssignment = monitoredResolver
           .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
               preferenceList);
       if (recoveryAssignment == null || !recoveryAssignment.keySet()
@@ -255,6 +257,7 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
             "Invalid recovery assignment %s since it changed the current partition placement %s",
             recoveryAssignment, currentStateMap));
       }
+      monitoredResolver.recordRecoveryAttempt();
       return Optional.of(recoveryAssignment);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index f169e07..e0fe24d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -32,8 +32,8 @@ import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
@@ -283,10 +283,10 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
       IdealState idealState, ClusterConfig clusterConfig, Partition partition,
-      AbnormalStateResolver resolver) {
+      MonitoredAbnormalResolver monitoredResolver) {
     Optional<Map<String, String>> optionalOverwrittenStates =
         computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
-            idealState, partition, resolver);
+            idealState, partition, monitoredResolver);
     if (optionalOverwrittenStates.isPresent()) {
       return optionalOverwrittenStates.get();
     }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/MonitoredAbnormalResolver.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/MonitoredAbnormalResolver.java
new file mode 100644
index 0000000..85f633a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/MonitoredAbnormalResolver.java
@@ -0,0 +1,117 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.metrics.AbnormalStatesMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+
+/**
+ * A wrap class to add monitor functionality into an AbnormalStateResolver implementation.
+ */
+public class MonitoredAbnormalResolver implements AbnormalStateResolver {
+  private final AbnormalStateResolver _resolver;
+  private final AbnormalStatesMetricCollector _metricCollector;
+
+  /**
+   * A placeholder which will be used when the resolver is not specified.
+   * This is a dummy class that does not really functional.
+   */
+  public final static MonitoredAbnormalResolver DUMMY_STATE_RESOLVER =
+      new MonitoredAbnormalResolver(new AbnormalStateResolver() {
+        public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
+            final String resourceName, final Partition partition,
+            final StateModelDefinition stateModelDef) {
+          // By default, all current states are valid.
+          return true;
+        }
+
+        public Map<String, String> computeRecoveryAssignment(
+            final CurrentStateOutput currentStateOutput, final String resourceName,
+            final Partition partition, final StateModelDefinition stateModelDef,
+            final List<String> preferenceList) {
+          throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
+        }
+      }, null);
+
+  private MonitoredAbnormalResolver(AbnormalStateResolver resolver,
+      AbnormalStatesMetricCollector metricCollector) {
+    if (resolver instanceof MonitoredAbnormalResolver) {
+      throw new IllegalArgumentException(
+          "Cannot construct a MonitoredAbnormalResolver wrap object using another MonitoredAbnormalResolver object.");
+    }
+    _resolver = resolver;
+    _metricCollector = metricCollector;
+  }
+
+  public MonitoredAbnormalResolver(AbnormalStateResolver resolver, String clusterName,
+      String stateModelDef) {
+    this(resolver, new AbnormalStatesMetricCollector(clusterName, stateModelDef));
+  }
+
+  public void recordAbnormalState() {
+    _metricCollector.getMetric(
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.AbnormalStatePartitionCounter.name(),
+        CountMetric.class).increment(1);
+  }
+
+  public void recordRecoveryAttempt() {
+    _metricCollector.getMetric(
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.RecoveryAttemptCounter.name(),
+        CountMetric.class).increment(1);
+  }
+
+  public Class getResolverClass() {
+    return _resolver.getClass();
+  }
+
+  @Override
+  public boolean checkCurrentStates(CurrentStateOutput currentStateOutput, String resourceName,
+      Partition partition, StateModelDefinition stateModelDef) {
+    return _resolver
+        .checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef);
+  }
+
+  @Override
+  public Map<String, String> computeRecoveryAssignment(CurrentStateOutput currentStateOutput,
+      String resourceName, Partition partition, StateModelDefinition stateModelDef,
+      List<String> preferenceList) {
+    return _resolver
+        .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
+            preferenceList);
+  }
+
+  public void close() {
+    if (_metricCollector != null) {
+      _metricCollector.unregister();
+    }
+  }
+
+  @Override
+  public void finalize() {
+    close();
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/AbnormalStatesMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/AbnormalStatesMetricCollector.java
new file mode 100644
index 0000000..a124624
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/AbnormalStatesMetricCollector.java
@@ -0,0 +1,67 @@
+package org.apache.helix.monitoring.metrics;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.management.JMException;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.implementation.RebalanceCounter;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+
+public class AbnormalStatesMetricCollector extends MetricCollector {
+  private static final String ABNORMAL_STATES_ENTITY_NAME = "AbnormalStates";
+
+  /**
+   * This enum class contains all metric names defined for AbnormalStateResolver.
+   * Note that all enums are in camel case for readability.
+   */
+  public enum AbnormalStatesMetricNames {
+    // The counter of the partitions that contains abnormal state.
+    AbnormalStatePartitionCounter,
+    // The counter of the attempts that the resolver tries to recover the abnormal state.
+    RecoveryAttemptCounter
+  }
+
+  public AbnormalStatesMetricCollector(String clusterName, String stateModelDef) {
+    super(MonitorDomainNames.Rebalancer.name(), clusterName,
+        String.format("%s.%s", ABNORMAL_STATES_ENTITY_NAME, stateModelDef));
+    createMetrics();
+    if (clusterName != null) {
+      try {
+        register();
+      } catch (JMException e) {
+        throw new HelixException(
+            "Failed to register MBean for the " + AbnormalStatesMetricCollector.class
+                .getSimpleName(), e);
+      }
+    }
+  }
+
+  private void createMetrics() {
+    // Define all metrics
+    CountMetric abnormalStateReplicasCounter =
+        new RebalanceCounter(AbnormalStatesMetricNames.AbnormalStatePartitionCounter.name());
+    CountMetric RecoveryAttemptCounter =
+        new RebalanceCounter(AbnormalStatesMetricNames.RecoveryAttemptCounter.name());
+    addMetric(abnormalStateReplicasCounter);
+    addMetric(RecoveryAttemptCounter);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 72bb726..6ecaa30 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -23,7 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -54,7 +54,7 @@ public class TestAbstractRebalancer {
             BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
             preferenceList, currentStateOutput, new HashSet<>(disabledInstancesForPartition),
             new IdealState("test"), new ClusterConfig("TestCluster"), partition,
-            AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+            MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
 
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 0d09079..a4bbf52 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,8 +41,8 @@ import com.google.common.collect.Sets;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.MockAccessor;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -245,7 +245,7 @@ public class TestAutoRebalanceStrategy {
         Map<String, String> assignment = new AutoRebalancer()
             .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
                 preferenceList, currentStateOutput, disabled, is, clusterConfig, p,
-                AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+                MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
         mapResult.put(partition, assignment);
       }
       return mapResult;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 33885ca..817aa62 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -32,7 +32,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -89,7 +89,7 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
     Map<String, String> bestPossibleMap = rebalancer
         .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList,
             currentStateOutput, Collections.emptySet(), is, new ClusterConfig("TestCluster"),
-            partition, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+            partition, MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
         "Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap
             + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/TestAbnormalStatesResolverMonitor.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/TestAbnormalStatesResolverMonitor.java
new file mode 100644
index 0000000..98ff87a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/TestAbnormalStatesResolverMonitor.java
@@ -0,0 +1,88 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.AbnormalStatesMetricCollector;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAbnormalStatesResolverMonitor {
+  private static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer();
+  private final String CLUSTER_NAME = "TestCluster";
+
+  @Test
+  public void testMonitorResolver()
+      throws MalformedObjectNameException, AttributeNotFoundException, MBeanException,
+      ReflectionException, InstanceNotFoundException {
+    final String testResolverMonitorMbeanName = String
+        .format("%s:%s=%s, %s=%s.%s", MonitorDomainNames.Rebalancer, "ClusterName", CLUSTER_NAME,
+            "EntityName", "AbnormalStates", MasterSlaveSMD.name);
+    final ObjectName testResolverMonitorMbeanObjectName =
+        new ObjectName(testResolverMonitorMbeanName);
+
+    Assert.assertFalse(MBEAN_SERVER.isRegistered(testResolverMonitorMbeanObjectName));
+
+    // Update the resolver configuration for MasterSlave state model.
+    MonitoredAbnormalResolver monitoredAbnormalResolver =
+        new MonitoredAbnormalResolver(new MockAbnormalStateResolver(), CLUSTER_NAME,
+            MasterSlaveSMD.name);
+
+    // Validate if the MBean has been registered
+    Assert.assertTrue(MBEAN_SERVER.isRegistered(testResolverMonitorMbeanObjectName));
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.AbnormalStatePartitionCounter
+            .name()), 0L);
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.RecoveryAttemptCounter.name()), 0L);
+    // Validate if the metrics recording methods work as expected
+    Random ran = new Random(System.currentTimeMillis());
+    Long expectation = 1L + ran.nextInt(10);
+    for (int i = 0; i < expectation; i++) {
+      monitoredAbnormalResolver.recordAbnormalState();
+    }
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.AbnormalStatePartitionCounter
+            .name()), expectation);
+    expectation = 1L + ran.nextInt(10);
+    for (int i = 0; i < expectation; i++) {
+      monitoredAbnormalResolver.recordRecoveryAttempt();
+    }
+    Assert.assertEquals(MBEAN_SERVER.getAttribute(testResolverMonitorMbeanObjectName,
+        AbnormalStatesMetricCollector.AbnormalStatesMetricNames.RecoveryAttemptCounter.name()),
+        expectation);
+
+    // Reset the resolver map
+    monitoredAbnormalResolver.close();
+    // Validate if the MBean has been unregistered
+    Assert.assertFalse(MBEAN_SERVER.isRegistered(testResolverMonitorMbeanObjectName));
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index ca8fd53..e09d090 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,8 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
@@ -112,7 +112,7 @@ public abstract class AbstractTestClusterModel {
     testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
     when(testCache.getAbnormalStateResolver(any()))
-        .thenReturn(AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+        .thenReturn(MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
 
     // 3. Mock the live instance node for the default instance.
     LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
index e10645f..5112491 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -32,10 +32,10 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
-import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.constraint.ExcessiveTopStateResolver;
 import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
+import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -50,14 +50,18 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
+  // TODO: remove this wait time once we have a better way to determine if the rebalance has been
+  // TODO: done as a reaction of the test operations.
+  protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1000;
+
   @Test
   public void testConfigureResolver() {
     ResourceControllerDataProvider cache = new ResourceControllerDataProvider(CLUSTER_NAME);
     // Verify the initial setup.
     cache.refresh(_controller.getHelixDataAccessor());
     for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
-      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
-          AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getResolverClass(),
+          MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER.getResolverClass());
     }
 
     // Update the resolver configuration for MasterSlave state model.
@@ -70,10 +74,10 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     cache.requireFullRefresh();
     cache.refresh(_controller.getHelixDataAccessor());
     for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
-      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getResolverClass(),
           stateModelDefName.equals(MasterSlaveSMD.name) ?
               MockAbnormalStateResolver.class :
-              AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+              MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER.getResolverClass());
     }
 
     // Reset the resolver map
@@ -83,7 +87,7 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
   }
 
   @Test(dependsOnMethods = "testConfigureResolver")
-  public void testExcessiveTopStateResolver() {
+  public void testExcessiveTopStateResolver() throws InterruptedException {
     BestPossibleExternalViewVerifier verifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
     Assert.assertTrue(verifier.verify());
@@ -93,16 +97,17 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     ExternalView ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
     String targetPartition = ev.getPartitionSet().iterator().next();
     Map<String, String> partitionAssignment = ev.getStateMap(targetPartition);
-    String slaveHost =
-        partitionAssignment.entrySet().stream().filter(entry -> entry.getValue().equals("SLAVE"))
-            .findAny().get().getKey();
-    long previousMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    String slaveHost = partitionAssignment.entrySet().stream()
+        .filter(entry -> entry.getValue().equals(MasterSlaveSMD.States.SLAVE.name())).findAny()
+        .get().getKey();
+    long previousMasterUpdateTime =
+        getTopStateUpdateTime(ev, targetPartition, MasterSlaveSMD.States.MASTER.name());
 
     // Build SLAVE to MASTER message
     String msgId = new UUID(123, 456).toString();
-    Message msg =
-        createMessage(Message.MessageType.STATE_TRANSITION, msgId, "SLAVE", "MASTER", TEST_DB,
-            slaveHost);
+    Message msg = createMessage(Message.MessageType.STATE_TRANSITION, msgId,
+        MasterSlaveSMD.States.SLAVE.name(), MasterSlaveSMD.States.MASTER.name(), TEST_DB,
+        slaveHost);
     msg.setStateModelDef(MasterSlaveSMD.name);
 
     Criteria cr = new Criteria();
@@ -130,17 +135,18 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     // 2.A. Without resolver, the fixing is not completely done by the default rebalancer logic.
     _controller.getMessagingService()
         .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     // Wait until the partition status is fixed, verify if the result is as expected
     verifier =
         new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
-    Assert.assertTrue(verifier.verify());
+    Assert.assertTrue(verifier.verifyByPolling());
     ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
-    Assert.assertEquals(
-        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
-            .count(), 1);
+    Assert.assertEquals(ev.getStateMap(targetPartition).values().stream()
+        .filter(state -> state.equals(MasterSlaveSMD.States.MASTER.name())).count(), 1);
     // Since the resolver is not used in the auto default fix process, there is no update on the
     // original master. So if there is any data issue, it was not fixed.
-    long currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    long currentMasterUpdateTime =
+        getTopStateUpdateTime(ev, targetPartition, MasterSlaveSMD.States.MASTER.name());
     Assert.assertFalse(currentMasterUpdateTime > previousMasterUpdateTime);
 
     // 2.B. with resolver configured, the fixing is complete.
@@ -149,17 +155,19 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     clusterConfig.setAbnormalStateResolverMap(
         ImmutableMap.of(MasterSlaveSMD.name, ExcessiveTopStateResolver.class.getName()));
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
     _controller.getMessagingService()
         .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME);
     // Wait until the partition status is fixed, verify if the result is as expected
-    Assert.assertTrue(verifier.verify());
+    Assert.assertTrue(verifier.verifyByPolling());
     ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
-    Assert.assertEquals(
-        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
-            .count(), 1);
+    Assert.assertEquals(ev.getStateMap(targetPartition).values().stream()
+        .filter(state -> state.equals(MasterSlaveSMD.States.MASTER.name())).count(), 1);
     // Now the resolver is used in the auto fix process, the original master has also been refreshed.
     // The potential data issue has been fixed in this process.
-    currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    currentMasterUpdateTime =
+        getTopStateUpdateTime(ev, targetPartition, MasterSlaveSMD.States.MASTER.name());
     Assert.assertTrue(currentMasterUpdateTime > previousMasterUpdateTime);
 
     // Reset the resolver map