You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/11/01 23:02:18 UTC

[4/8] helix git commit: Emitting per resource rebalance status for possible calculation failure.

Emitting per resource rebalance status for possible calculation failure.

The status in MBean will be string for debug purposes only.
The resource rebalance state attribute will be in one of the following state:
1. NORMAL
2. BEST_POSSIBLE_STATE_CAL_FAILED: calculation failed or no possible allocation found.
3. INTERMEDIATE_STATE_CAL_FAILED: Intermediate state calculation failed. (not include throttled case)
4. UNKNOWN: the resource is not rebalanced or newly created.

Additional related changes:
1. Fix a cluster level metric related bug to generate the right metrics data.
2. Fix a resource monitoring bug that DISABLE_MONITORING is not working.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2f39f381
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2f39f381
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2f39f381

Branch: refs/heads/master
Commit: 2f39f381b0981503d7c204aabbeaa09153292e15
Parents: b549cda
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Oct 5 16:26:11 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Nov 1 14:38:36 2018 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |  52 ++++----
 .../stages/ExternalViewComputeStage.java        |   4 +-
 .../stages/IntermediateStateCalcStage.java      |  19 +++
 .../monitoring/mbeans/ClusterStatusMonitor.java |  66 ++++++----
 .../monitoring/mbeans/ResourceMonitor.java      |  35 ++++--
 .../dynamicMBeans/DynamicMBeanProvider.java     |  18 +--
 .../TestAlertingRebalancerFailure.java          | 123 +++++++++++++------
 ...ceModeWhenReachingOfflineInstancesLimit.java |  61 ++++++---
 .../mbeans/TestClusterStatusMonitor.java        |  10 +-
 .../mbeans/TestDisableResourceMbean.java        |  17 ++-
 .../monitoring/mbeans/TestResourceMonitor.java  |  21 +++-
 11 files changed, 291 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 1bbd6a0..b0e453d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -43,6 +43,7 @@ import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskRebalancer;
 import org.apache.helix.util.HelixUtil;
@@ -75,11 +76,6 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     // Reset current INIT/RUNNING tasks on participants for throttling
     cache.resetActiveTaskCount(currentStateOutput);
 
-    // Check whether the offline/disabled instance count in the cluster reaches the set limit,
-    // if yes, pause the rebalancer.
-    validateOfflineInstancesLimit(cache,
-        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()), clusterStatusMonitor);
-
     final BestPossibleStateOutput bestPossibleStateOutput =
         compute(event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
@@ -112,6 +108,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     BestPossibleStateOutput output = new BestPossibleStateOutput();
 
     HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+
+    // Check whether the offline/disabled instance count in the cluster reaches the set limit,
+    // if yes, pause the rebalancer.
+    boolean isValid = validateOfflineInstancesLimit(cache,
+        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
 
     final List<String> failureResources = new ArrayList<>();
     Iterator<Resource> itr = resourceMap.values().iterator();
@@ -125,6 +128,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         LogUtil.logError(logger, _eventId,
             "Exception when calculating best possible states for " + resource.getResourceName(),
             ex);
+
       }
       if (!result) {
         failureResources.add(resource.getResourceName());
@@ -134,31 +138,34 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     }
 
     // Check and report if resource rebalance has failure
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    updateRebalanceStatus(!failureResources.isEmpty(), helixManager, cache, clusterStatusMonitor,
+    updateRebalanceStatus(!isValid || !failureResources.isEmpty(), failureResources, helixManager,
+        cache, clusterStatusMonitor,
         "Failed to calculate best possible states for " + failureResources.size() + " resources.");
 
     return output;
   }
 
-  private void updateRebalanceStatus(final boolean hasFailure, final HelixManager helixManager,
-      final ClusterDataCache cache, final ClusterStatusMonitor clusterStatusMonitor,
-      final String errorMessage) {
+  private void updateRebalanceStatus(final boolean hasFailure, final List<String> failedResources,
+      final HelixManager helixManager, final ClusterDataCache cache,
+      final ClusterStatusMonitor clusterStatusMonitor, final String errorMessage) {
     asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
       @Override
       public Object call() {
         try {
-          // TODO re-enable logging error after ticket HELIX-631 is resolved
-          /*
-          if (hasFailure && _statusUpdateUtil != null) {
-            _statusUpdateUtil
-                .logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, this.getClass(),
-                    errorMessage, helixManager);
+          if (hasFailure) {
+            /* TODO Enable this update when we resolve ZK server load issue. This will cause extra write to ZK.
+            if (_statusUpdateUtil != null) {
+              _statusUpdateUtil
+                  .logError(StatusUpdateUtil.ErrorType.RebalanceResourceFailure, this.getClass(),
+                      errorMessage, helixManager);
+            }
+            */
+            LogUtil.logWarn(logger, _eventId, errorMessage);
           }
-          */
           if (clusterStatusMonitor != null) {
             clusterStatusMonitor.setRebalanceFailureGauge(hasFailure);
+            clusterStatusMonitor.setResourceRebalanceStates(failedResources,
+                ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
           }
         } catch (Exception e) {
           LogUtil.logError(logger, _eventId, "Could not update cluster status!", e);
@@ -170,8 +177,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
 
   // Check whether the offline/disabled instance count in the cluster reaches the set limit,
   // if yes, pause the rebalancer, and throw exception to terminate rebalance cycle.
-  private void validateOfflineInstancesLimit(final ClusterDataCache cache,
-      final HelixManager manager, final ClusterStatusMonitor clusterStatusMonitor) {
+  private boolean validateOfflineInstancesLimit(final ClusterDataCache cache,
+      final HelixManager manager) {
     int maxOfflineInstancesAllowed = cache.getClusterConfig().getMaxOfflineInstancesAllowed();
     if (maxOfflineInstancesAllowed >= 0) {
       int offlineCount = cache.getAllInstances().size() - cache.getEnabledLiveInstances().size();
@@ -190,11 +197,10 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
           LogUtil.logError(logger, _eventId, "Failed to put cluster " + cache.getClusterName()
               + " into maintenance mode, HelixManager is not set!");
         }
-        if (!cache.isTaskCache()) {
-          updateRebalanceStatus(true, manager, cache, clusterStatusMonitor, errMsg);
-        }
+        return false;
       }
     }
+    return true;
   }
 
   private boolean computeResourceBestPossibleState(ClusterEvent event, ClusterDataCache cache,

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index e3a504b..667b254 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -187,9 +187,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
               cache.getStateModelDef(idealState.getStateModelDefRef());
           clusterStatusMonitor
               .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
-                  stateModelDef);
-          clusterStatusMonitor
-              .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
+                  stateModelDef, totalPendingMessageCount);
           monitoringResources.add(resourceName);
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c4d11d6..915a90f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -45,6 +45,7 @@ import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +138,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
 
     ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    List<String> failedResources = new ArrayList<>();
 
     // Priority is applied in assignment computation because higher priority by looping in order of
     // decreasing priority
@@ -170,8 +172,17 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
       } catch (HelixException ex) {
         LogUtil.logInfo(logger, _eventId,
             "Failed to calculate intermediate partition states for resource " + resourceName, ex);
+        failedResources.add(resourceName);
       }
     }
+
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.setResourceRebalanceStates(failedResources,
+          ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+      clusterStatusMonitor
+          .setResourceRebalanceStates(output.resourceSet(), ResourceMonitor.RebalanceStatus.NORMAL);
+    }
+
     return output;
   }
 
@@ -237,6 +248,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage {
                       + " mode due to an instance being assigned more replicas/partitions than "
                       + "the limit.");
             }
+
+            ClusterStatusMonitor clusterStatusMonitor =
+                event.getAttribute(AttributeName.clusterStatusMonitor.name());
+            if (clusterStatusMonitor != null) {
+              clusterStatusMonitor.setResourceRebalanceStates(Collections.singletonList(resource),
+                  ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+            }
+
             throw new HelixException(errMsg);
           }
           instancePartitionCounts.put(instance, partitionCount);

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index f870ddc..803bd3c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -19,9 +19,10 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,10 +34,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -61,7 +62,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   static final String RESOURCE_STATUS_KEY = "ResourceStatus";
   public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
   public static final String CLUSTER_DN_KEY = "cluster";
-  static final String RESOURCE_DN_KEY = "resourceName";
+  public static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
   static final String MESSAGE_QUEUE_DN_KEY = "messageQueue";
   static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
@@ -160,6 +161,16 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     this._rebalanceFailure = isFailure;
   }
 
+  public void setResourceRebalanceStates(Collection<String> resources,
+      ResourceMonitor.RebalanceStatus state) {
+    for (String resource : resources) {
+      ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resource);
+      if (resourceMonitor != null) {
+        resourceMonitor.setRebalanceState(state);
+      }
+    }
+  }
+
   @Override
   public long getMaxMessageQueueSizeGauge() {
     long maxQueueSize = 0;
@@ -421,11 +432,19 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   public void retainResourceMonitor(Set<String> resourceNames) {
     Set<String> resourcesToRemove = new HashSet<>();
     synchronized (this) {
+      resourceNames.retainAll(_resourceMbeanMap.keySet());
       resourcesToRemove.addAll(_resourceMbeanMap.keySet());
     }
     resourcesToRemove.removeAll(resourceNames);
 
     try {
+      registerResources(resourceNames);
+    } catch (JMException e) {
+      LOG.error(String.format("Could not register beans for the following resources: %s",
+          Joiner.on(',').join(resourceNames)), e);
+    }
+
+    try {
       unregisterResources(resourcesToRemove);
     } catch (MalformedObjectNameException e) {
       LOG.error(String.format("Could not unregister beans for the following resources: %s",
@@ -433,12 +452,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void setResourceStatus(ExternalView externalView, IdealState idealState, StateModelDefinition stateModelDef) {
+  public void setResourceStatus(ExternalView externalView, IdealState idealState,
+      StateModelDefinition stateModelDef, int messageCount) {
     try {
       ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(externalView.getId());
 
       if (resourceMonitor != null) {
-        resourceMonitor.updateResource(externalView, idealState, stateModelDef);
+        resourceMonitor.updateResourceState(externalView, idealState, stateModelDef);
+        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
       }
     } catch (Exception e) {
       LOG.error("Fail to set resource status, resource: " + idealState.getResourceName(), e);
@@ -461,24 +482,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions,
+      resourceMonitor.updateRebalancerStats(numPendingRecoveryRebalancePartitions,
           numPendingLoadRebalancePartitions, numRecoveryRebalanceThrottledPartitions,
           numLoadRebalanceThrottledPartitions);
     }
   }
 
-  public synchronized void updatePendingMessages(String resourceName, int messageCount) {
-    try {
-      ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
-
-      if (resourceMonitor != null) {
-        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
-      }
-    } catch (Exception e) {
-      LOG.error("Fail to update resource pending messages, resource: " + resourceName, e);
-    }
-  }
-
   private ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
     try {
       if (!_resourceMbeanMap.containsKey(resourceName)) {
@@ -487,7 +496,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
                 new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName));
-            bean.register();
             _resourceMbeanMap.put(resourceName, bean);
           }
         }
@@ -663,6 +671,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _instanceMbeanMap.keySet().removeAll(instances);
   }
 
+  private synchronized void registerResources(Collection<String> resources) throws JMException {
+    for (String resourceName : resources) {
+      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
+      if (monitor != null) {
+        monitor.register();
+      }
+    }
+  }
+
   private synchronized void unregisterResources(Collection<String> resources) throws MalformedObjectNameException {
     for (String resourceName : resources) {
       ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
@@ -729,6 +746,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
+  // For test only
   protected ResourceMonitor getResourceMonitor(String resourceName) {
     return _resourceMbeanMap.get(resourceName);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index c3dd242..fb9a779 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -41,6 +41,13 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
 public class ResourceMonitor extends DynamicMBeanProvider {
 
+  public enum RebalanceStatus {
+    UNKNOWN,
+    NORMAL,
+    BEST_POSSIBLE_STATE_CAL_FAILED,
+    INTERMEDIATE_STATE_CAL_FAILED
+  }
+
   // Gauges
   private SimpleDynamicMetric<Long> _numOfPartitions;
   private SimpleDynamicMetric<Long> _numOfPartitionsInExternalView;
@@ -67,6 +74,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge;
   private HistogramDynamicMetric _partitionTopStateNonGracefulHandoffDurationGauge;
 
+  private SimpleDynamicMetric<String> _rebalanceState;
+
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
   private final String _resourceName;
@@ -96,6 +105,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
     attributeList.add(_numPendingStateTransitions);
+    attributeList.add(_rebalanceState);
     doRegister(attributeList, _initObjectName);
     return this;
   }
@@ -146,6 +156,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _successTopStateHandoffCounter = new SimpleDynamicMetric("SucceededTopStateHandoffCounter", 0L);
     _successfulTopStateHandoffDurationCounter =
         new SimpleDynamicMetric("SuccessfulTopStateHandoffDurationCounter", 0L);
+
+    _rebalanceState = new SimpleDynamicMetric<>("RebalanceStatus", RebalanceStatus.UNKNOWN.name());
   }
 
   @Override
@@ -214,7 +226,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _clusterName + " " + _resourceName;
   }
 
-  public void updateResource(ExternalView externalView, IdealState idealState,
+  public void updateResourceState(ExternalView externalView, IdealState idealState,
       StateModelDefinition stateModelDef) {
     if (externalView == null) {
       _logger.warn("External view is null");
@@ -229,7 +241,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       }
     }
 
-    resetGauges();
+    resetResourceStateGauges();
 
     if (idealState == null) {
       _logger.warn("ideal state is null for {}", _resourceName);
@@ -319,20 +331,13 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     }
   }
 
-  private void resetGauges() {
+  private void resetResourceStateGauges() {
     _numOfErrorPartitions.updateValue(0L);
     _numNonTopStatePartitions.updateValue(0L);
     _externalViewIdealStateDiff.updateValue(0L);
     _numOfPartitionsInExternalView.updateValue(0L);
-
-    // The following gauges are computed each call to updateResource by way of looping so need to be reset.
     _numLessMinActiveReplicaPartitions.updateValue(0L);
     _numLessReplicaPartitions.updateValue(0L);
-    _numPendingRecoveryRebalancePartitions.updateValue(0L);
-    _numPendingLoadRebalancePartitions.updateValue(0L);
-    _numRecoveryRebalanceThrottledPartitions.updateValue(0L);
-    _numLoadRebalanceThrottledPartitions.updateValue(0L);
-    _numPendingStateTransitions.updateValue(0L);
   }
 
   public void updatePendingStateTransitionMessages(int messageCount) {
@@ -367,7 +372,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     }
   }
 
-  public void updateRebalancerStat(long numPendingRecoveryRebalancePartitions,
+  public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
       long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions,
       long numLoadRebalanceThrottledPartitions) {
     _numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions);
@@ -376,6 +381,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
   }
 
+  public void setRebalanceState(RebalanceStatus state) {
+    _rebalanceState.updateValue(state.name());
+  }
+
   public long getExternalViewPartitionGauge() {
     return _numOfPartitionsInExternalView.getValue();
   }
@@ -408,6 +417,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _numPendingStateTransitions.getValue();
   }
 
+  public String getRebalanceState() {
+    return _rebalanceState.getValue();
+  }
+
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
       _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 988ba9b..fbbb9e6 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -51,14 +51,15 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
    * @param domain         the MBean domain name
    * @param keyValuePairs  the MBean object name components
    */
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       String description, String domain, String... keyValuePairs) throws JMException {
     if (_objectName != null) {
-      throw new HelixException(
-          "Mbean has been registered before. Please create new object for new registration.");
+      _logger.warn("Mbean has been registered before. Please create new object for new registration.");
+      return false;
     }
     updateAttributtInfos(dynamicMetrics, description);
     _objectName = MBeanRegistrar.register(this, domain, keyValuePairs);
+    return true;
   }
 
   /**
@@ -68,19 +69,20 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
    * @param description    the MBean description
    * @param objectName     the proposed MBean ObjectName
    */
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       String description, ObjectName objectName) throws JMException {
     if (_objectName != null) {
-      throw new HelixException(
-          "Mbean has been registered before. Please create new object for new registration.");
+      _logger.warn("Mbean has been registered before. Please create new object for new registration.");
+      return false;
     }
     updateAttributtInfos(dynamicMetrics, description);
     _objectName = MBeanRegistrar.register(this, objectName);
+    return true;
   }
 
-  protected synchronized void doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+  protected synchronized boolean doRegister(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       ObjectName objectName) throws JMException {
-    doRegister(dynamicMetrics, null, objectName);
+    return doRegister(dynamicMetrics, null, objectName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
index e732c85..7defef4 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
@@ -19,18 +19,19 @@ package org.apache.helix.integration;
  * under the License.
  */
 
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Set;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -42,6 +43,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -50,6 +52,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.RESOURCE_DN_KEY;
 import static org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure;
 
 public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
@@ -91,6 +94,9 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
 
     accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
     errorNodeKey = accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+
+    _clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
   }
 
   @BeforeMethod
@@ -99,8 +105,8 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     accessor.removeProperty(errorNodeKey);
   }
 
-  @Test (enabled = false)
-  public void testParticipantUnavailable() {
+  @Test
+  public void testParticipantUnavailable() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name());
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
@@ -119,6 +125,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     // Verify there is no rebalance error logged
     Assert.assertNull(accessor.getProperty(errorNodeKey));
     checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
 
     // kill nodes, so rebalance cannot be done
     for (int i = 0; i < NODE_NR; i++) {
@@ -126,8 +133,10 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     }
 
     // Verify the rebalance error caused by no node available
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
@@ -138,10 +147,20 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     }
   }
 
-  @Test (enabled = false)
-  public void testTagSetIncorrect() {
+  @Test (dependsOnMethods = "testParticipantUnavailable")
+  public void testTagSetIncorrect() throws Exception {
     _gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name());
+    ZkHelixClusterVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(new HashSet<>(Collections.singleton(testDb))).build();
+    Assert.assertTrue(verifier.verifyByPolling());
+
+    // Verify there is no rebalance error logged
+    Assert.assertNull(accessor.getProperty(errorNodeKey));
+    checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
+
     // set expected instance tag
     IdealState is =
         _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
@@ -150,15 +169,17 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
 
     // Verify there is rebalance error logged
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
   }
 
-  @Test (enabled = false)
-  public void testWithDomainId() throws InterruptedException {
+  @Test (dependsOnMethods = "testTagSetIncorrect")
+  public void testWithDomainId() throws Exception {
     int replicas = 2;
     ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
     // 1. disable all participants except one node, then set domain Id
@@ -192,14 +213,17 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     // Verify there is no rebalance error logged
     Assert.assertNull(accessor.getProperty(errorNodeKey));
     checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
 
     // 2. enable the rest nodes with no domain Id
     for (int i = replicas; i < NODE_NR; i++) {
       setInstanceEnable(_participants[i].getInstanceName(), true, configAccessor);
     }
     // Verify there is rebalance error logged
-    Assert.assertNotNull(pollForError(accessor, errorNodeKey));
+    pollForError(accessor, errorNodeKey);
     checkRebalanceFailureGauge(true);
+    checkResourceBestPossibleCalFailureState(
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
 
     // 3. reset all nodes domain Id to be correct setting
     for (int i = replicas; i < NODE_NR; i++) {
@@ -211,6 +235,7 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
 
     // Verify that rebalance error state is removed
     checkRebalanceFailureGauge(false);
+    checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
 
     // clean up
     _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
@@ -223,6 +248,14 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
         String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName));
   }
 
+  private ObjectName getResourceMbeanName(String clusterName, String resourceName)
+      throws MalformedObjectNameException {
+    String resourceBeanName =
+        String.format("%s=%s,%s=%s", CLUSTER_DN_KEY, clusterName, RESOURCE_DN_KEY, resourceName);
+    return new ObjectName(
+        String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), resourceBeanName));
+  }
+
   private void setDomainId(String instanceName, ConfigAccessor configAccessor) {
     String domain = String.format("Rack=%s, Instance=%s", instanceName, instanceName);
     InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
@@ -237,30 +270,50 @@ public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
     configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
   }
 
-  private void checkRebalanceFailureGauge(boolean expectFailure) {
-    try {
-      Long value = (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
-      Assert.assertNotNull(value);
-      Assert.assertEquals(value == 1, expectFailure);
-    } catch (Exception e) {
-      Assert.fail("Failed to get attribute!");
-    }
+  private void checkRebalanceFailureGauge(final boolean expectFailure) throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        try {
+          Long value =
+              (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
+          return value != null && (value == 1) == expectFailure;
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    }, 5000); Assert.assertTrue(result);
   }
 
-  private HelixProperty pollForError(HelixDataAccessor accessor, PropertyKey key) {
-    final int POLL_TIMEOUT = 5000;
-    final int POLL_INTERVAL = 100;
-    HelixProperty property = accessor.getProperty(key);
-    int timeWaited = 0;
-    while (property == null && timeWaited < POLL_TIMEOUT) {
-      try {
-        Thread.sleep(POLL_INTERVAL);
-      } catch (InterruptedException e) {
-        return null;
+  private void checkResourceBestPossibleCalFailureState(
+      final ResourceMonitor.RebalanceStatus expectedState, final String resourceName)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        try {
+          String state = (String) _server
+              .getAttribute(getResourceMbeanName(CLUSTER_NAME, resourceName), "RebalanceStatus");
+          return state != null && state.equals(expectedState.name());
+        } catch (Exception e) {
+          return false;
+        }
       }
-      timeWaited += POLL_INTERVAL;
-      property = accessor.getProperty(key);
-    }
-    return property;
+    }, 5000);
+    Assert.assertTrue(result);
+  }
+
+  private void pollForError(final HelixDataAccessor accessor, final PropertyKey key)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() {
+        /* TODO re-enable this check when we start recording rebalance error again
+        return accessor.getProperty(key) != null;
+        */
+        return true;
+      }
+    }, 5000);
+    Assert.assertTrue(result);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
index c89505b..0de2fe3 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit.java
@@ -19,16 +19,17 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+import javax.management.*;
+
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -42,10 +43,12 @@ import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+import static org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure;
 
 public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     extends ZkTestBase {
@@ -105,12 +108,19 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
+  @AfterMethod
+  public void afterMethod() {
+    cleanupRebalanceError();
+  }
+
   @Test
   public void testWithDisabledInstancesLimit() throws Exception {
     MaintenanceSignal maintenanceSignal =
         _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance());
     Assert.assertNull(maintenanceSignal);
 
+    checkForRebalanceError(false);
+
     HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
 
     // disable instance
@@ -133,6 +143,8 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertNotNull(maintenanceSignal);
     Assert.assertNotNull(maintenanceSignal.getReason());
 
+    checkForRebalanceError(true);
+
     for (i = 2; i < 2 + _maxOfflineInstancesAllowed + 1; i++) {
       instance = _participants.get(i).getInstanceName();
       admin.enableInstance(CLUSTER_NAME, instance, true);
@@ -146,6 +158,9 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     MaintenanceSignal maintenanceSignal =
         _dataAccessor.getProperty(_dataAccessor.keyBuilder().maintenance());
     Assert.assertNull(maintenanceSignal);
+
+    checkForRebalanceError(false);
+
     int i;
     for (i = 2; i < 2 + _maxOfflineInstancesAllowed; i++) {
       _participants.get(i).syncStop();
@@ -163,19 +178,8 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     Assert.assertNotNull(maintenanceSignal);
     Assert.assertNotNull(maintenanceSignal.getReason());
 
-    // TODO re-enable the check after HELIX-631 is fixed
-    /*
-    // Verify there is no rebalance error logged
-    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
-    PropertyKey errorNodeKey =
-        accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
-    Assert.assertNotNull(accessor.getProperty(errorNodeKey));
-
-    Long value =
-        (Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
-    Assert.assertNotNull(value);
-    Assert.assertTrue(value.longValue() > 0);
-    */
+    // Verify there is rebalance error logged
+    checkForRebalanceError(true);
   }
 
   @AfterClass
@@ -193,12 +197,33 @@ public class TestClusterInMaintenanceModeWhenReachingOfflineInstancesLimit
     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
   }
 
-  private ObjectName getMbeanName(String clusterName)
+  private void checkForRebalanceError(boolean expectError)
+      throws MalformedObjectNameException, AttributeNotFoundException, MBeanException,
+      ReflectionException, InstanceNotFoundException, IOException {
+    /* TODO re-enable this check when we start recording rebalance error again
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    PropertyKey errorNodeKey =
+        accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+    Assert.assertEquals(accessor.getProperty(errorNodeKey) != null, expectError);
+    */
+
+    Long value =
+        (Long) _server.getAttribute(getClusterMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
+    Assert.assertEquals(value != null && value.longValue() > 0, expectError);
+  }
+
+  private void cleanupRebalanceError() {
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    PropertyKey errorNodeKey =
+        accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
+    accessor.removeProperty(errorNodeKey);
+  }
+
+  private ObjectName getClusterMbeanName(String clusterName)
       throws MalformedObjectNameException {
     String clusterBeanName =
         String.format("%s=%s", CLUSTER_DN_KEY, clusterName);
     return new ObjectName(
         String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 143d325..b2daba6 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -205,7 +205,7 @@ public class TestClusterStatusMonitor {
     StateModelDefinition stateModelDef =
         BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
 
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getTotalResourceGauge(), 1);
@@ -238,7 +238,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
@@ -266,7 +266,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
@@ -291,7 +291,7 @@ public class TestClusterStatusMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, 0);
     Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
@@ -313,7 +313,7 @@ public class TestClusterStatusMonitor {
 
     // test pending state transition message report and read
     messageCount = new Random().nextInt(numPartition) + 1;
-    monitor.updatePendingMessages(testDB, messageCount);
+    monitor.setResourceStatus(externalView, idealState, stateModelDef, messageCount);
     Assert.assertEquals(monitor.getPendingStateTransitionGuage(), messageCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
index fbbf4b8..0c8ebe7 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDisableResourceMbean.java
@@ -88,9 +88,9 @@ public class TestDisableResourceMbean extends ZkUnitTestBase {
     Assert.assertTrue(clusterVerifier.verifyByPolling());
 
     // Verify the bean was created for TestDB0, but not for TestDB1.
-    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB0", clusterName)));
-    Assert.assertFalse(_mbeanServer.isRegistered(getMbeanName("TestDB1", clusterName)));
-    Assert.assertTrue(_mbeanServer.isRegistered(getMbeanName("TestDB2", clusterName)));
+    pollForMBeanExistance(getMbeanName("TestDB0", clusterName), true);
+    pollForMBeanExistance(getMbeanName("TestDB1", clusterName), false);
+    pollForMBeanExistance(getMbeanName("TestDB2", clusterName), true);
 
     controller.syncStop();
     for (MockParticipantManager participant : participants) {
@@ -100,6 +100,17 @@ public class TestDisableResourceMbean extends ZkUnitTestBase {
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  private void pollForMBeanExistance(final ObjectName objectName, boolean expectation)
+      throws Exception {
+    boolean result = TestHelper.verify(new TestHelper.Verifier() {
+      @Override
+      public boolean verify() throws Exception {
+        return _mbeanServer.isRegistered(objectName);
+      }
+    }, 3000);
+    Assert.assertEquals(result, expectation);
+  }
+
   private ObjectName getMbeanName(String resourceName, String clusterName)
       throws MalformedObjectNameException {
     String clusterBeanName =

http://git-wip-us.apache.org/repos/asf/helix/blob/2f39f381/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 5310ded..713fd65 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -62,7 +62,7 @@ public class TestResourceMonitor {
     StateModelDefinition stateModelDef =
         BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -88,7 +88,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), errorCount);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), errorCount);
@@ -119,7 +119,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessMinActiveReplica);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -151,7 +151,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), lessReplica);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -181,7 +181,7 @@ public class TestResourceMonitor {
       externalView.setStateMap(partition, map);
     }
 
-    monitor.updateResource(externalView, idealState, stateModelDef);
+    monitor.updateResourceState(externalView, idealState, stateModelDef);
 
     Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), missTopState);
     Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
@@ -196,6 +196,17 @@ public class TestResourceMonitor {
     int messageCount = new Random().nextInt(_partitions) + 1;
     monitor.updatePendingStateTransitionMessages(messageCount);
     Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount);
+
+    Assert
+        .assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.UNKNOWN.name());
+    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.NORMAL);
+    Assert.assertEquals(monitor.getRebalanceState(), ResourceMonitor.RebalanceStatus.NORMAL.name());
+    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED);
+    Assert.assertEquals(monitor.getRebalanceState(),
+        ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED.name());
+    monitor.setRebalanceState(ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED);
+    Assert.assertEquals(monitor.getRebalanceState(),
+        ResourceMonitor.RebalanceStatus.INTERMEDIATE_STATE_CAL_FAILED.name());
   }
 
   /**