You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/13 22:56:16 UTC

helix git commit: [HELIX-675] Refactor controller start/cleanup logic to ensure monitor register/reset is handled in any event orders.

Repository: helix
Updated Branches:
  refs/heads/master 5383e7318 -> b6fd8cb83


[HELIX-675] Refactor controller start/cleanup logic to ensure monitor register/reset is handled in any event orders.

Due to different possible event process order, controller init event might be processed later or earlier than expected.
This cause inconsistency when even handler thread process and record information in the cluster monitor.
This change ensures cluster monitor is off when the leadership changes to other node. So no extra metric data will be generated.

Also upgrade related test cases to verify MBean counts.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b6fd8cb8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b6fd8cb8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b6fd8cb8

Branch: refs/heads/master
Commit: b6fd8cb8396e1453afba404b669534883ce796a7
Parents: 5383e73
Author: jiajunwang <er...@gmail.com>
Authored: Thu Mar 8 16:51:37 2018 -0800
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Mar 13 15:55:44 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 262 +++++++++----------
 .../stages/CurrentStateComputationStage.java    |  11 +-
 .../stages/ExternalViewComputeStage.java        |  55 ++--
 .../controller/stages/TaskAssignmentStage.java  |   4 +-
 .../manager/zk/DistributedLeaderElection.java   |   5 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java |  37 +--
 .../org/apache/helix/task/TaskRebalancer.java   |  14 +-
 .../TestClusterEventStatusMonitor.java          |   1 +
 .../TestClusterStatusMonitorLifecycle.java      | 144 +++++-----
 .../mbeans/TestClusterStatusMonitor.java        |   1 +
 .../mbeans/TestRebalancerMetrics.java           |   8 +-
 .../mbeans/TestTopStateHandoffMetrics.java      |   4 +-
 12 files changed, 284 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 2546bd2..933aa3e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,18 +19,6 @@ package org.apache.helix.controller;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -38,51 +26,25 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.api.listeners.ClusterConfigChangeListener;
-import org.apache.helix.api.listeners.ControllerChangeListener;
-import org.apache.helix.api.listeners.CurrentStateChangeListener;
-import org.apache.helix.api.listeners.IdealStateChangeListener;
-import org.apache.helix.api.listeners.InstanceConfigChangeListener;
-import org.apache.helix.api.listeners.LiveInstanceChangeListener;
-import org.apache.helix.api.listeners.MessageListener;
-import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.api.listeners.ResourceConfigChangeListener;
+import org.apache.helix.api.listeners.*;
+import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.common.ClusterEventBlockingQueue;
-import org.apache.helix.controller.stages.ClusterEventType;
-import org.apache.helix.controller.stages.CompatibilityCheckStage;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ExternalViewComputeStage;
-import org.apache.helix.controller.stages.IntermediateStateCalcStage;
-import org.apache.helix.controller.stages.MessageGenerationPhase;
-import org.apache.helix.controller.stages.MessageSelectionStage;
-import org.apache.helix.controller.stages.MessageThrottleStage;
-import org.apache.helix.controller.stages.PersistAssignmentStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.controller.stages.ResourceValidationStage;
-import org.apache.helix.controller.stages.TargetExteralViewCalcStage;
-import org.apache.helix.controller.stages.TaskAssignmentStage;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MaintenanceSignal;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.PauseSignal;
-import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.controller.stages.*;
+import org.apache.helix.model.*;
 import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.apache.helix.HelixConstants.ChangeType;
 
 /**
@@ -113,7 +75,10 @@ public class GenericHelixController implements IdealStateChangeListener,
   final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
   final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
 
-  ClusterStatusMonitor _clusterStatusMonitor;
+  // By default not reporting status until controller status is changed to activate
+  // TODO This flag should be inside ClusterStatusMonitor. When false, no MBean registering.
+  private boolean _isMonitoring = false;
+  private final ClusterStatusMonitor _clusterStatusMonitor;
 
   /**
    * A queue for controller events and a thread that will consume it
@@ -192,10 +157,10 @@ public class GenericHelixController implements IdealStateChangeListener,
 
       NotificationContext changeContext = new NotificationContext(_manager);
       changeContext.setType(NotificationContext.Type.CALLBACK);
-      ClusterEvent event = new ClusterEvent(_clusterName,ClusterEventType.PeriodicalRebalance);
+      ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.PeriodicalRebalance);
       event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
       event.addAttribute(AttributeName.changeContext.name(), changeContext);
-      List<ZNRecord> dummy = new ArrayList<ZNRecord>();
+      List<ZNRecord> dummy = new ArrayList<>();
       event.addAttribute(AttributeName.eventData.name(), dummy);
       // Should be able to process
       _eventQueue.put(event);
@@ -311,6 +276,8 @@ public class GenericHelixController implements IdealStateChangeListener,
 
     initPipelines(_eventThread, _cache, false);
     initPipelines(_taskEventThread, _taskCache, true);
+
+    _clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
   }
 
   /**
@@ -331,6 +298,9 @@ public class GenericHelixController implements IdealStateChangeListener,
       return;
     }
 
+    // TODO If init controller with paused = true, it may not take effect immediately
+    // _paused is default false. If any events come before controllerChangeEvent, the controller
+    // will be excuting in un-paused mode. Which might not be the config in ZK.
     if (_paused) {
       logger.info("Cluster " + manager.getClusterName() + " is paused. Ignoring the event:" + event
           .getEventType());
@@ -342,38 +312,28 @@ public class GenericHelixController implements IdealStateChangeListener,
       context = event.getAttribute(AttributeName.changeContext.name());
     }
 
-    // Initialize _clusterStatusMonitor
     if (context != null) {
       if (context.getType() == Type.FINALIZE) {
         stopRebalancingTimer();
         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType());
         return;
       } else {
-        synchronized (this) {
-          if (_clusterStatusMonitor == null) {
-            _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
-          }
-        }
-        // TODO: should be in the initization of controller.
+        // TODO: should be in the initialization of controller.
         if (_cache != null) {
           checkRebalancingTimer(manager, Collections.EMPTY_LIST, _cache.getClusterConfig());
         }
-
-        if (cache.isTaskCache()) {
-          TaskDriver driver = new TaskDriver(manager);
-          _clusterStatusMonitor.refreshWorkflowsStatus(driver);
-          _clusterStatusMonitor.refreshJobsStatus(driver);
+        if (_isMonitoring) {
+          event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
         }
-        event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
       }
     }
 
     // add the cache
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
 
-    List<Pipeline> pipelines = cache.isTaskCache()
-        ? _taskRegistry.getPipelinesForEvent(event.getEventType())
-        : _registry.getPipelinesForEvent(event.getEventType());
+    List<Pipeline> pipelines = cache.isTaskCache() ?
+        _taskRegistry.getPipelinesForEvent(event.getEventType()) :
+        _registry.getPipelinesForEvent(event.getEventType());
 
     if (pipelines == null || pipelines.size() == 0) {
       logger.info(
@@ -404,40 +364,51 @@ public class GenericHelixController implements IdealStateChangeListener,
 
     if (!cache.isTaskCache()) {
       // report event process durations
-      if (_clusterStatusMonitor != null) {
-        NotificationContext notificationContext =
-            event.getAttribute(AttributeName.changeContext.name());
-        long enqueueTime = event.getCreationTime();
-        long zkCallbackTime;
-        StringBuilder sb = new StringBuilder();
-        if (notificationContext != null) {
-          zkCallbackTime = notificationContext.getCreationTime();
+      NotificationContext notificationContext =
+          event.getAttribute(AttributeName.changeContext.name());
+      long enqueueTime = event.getCreationTime();
+      long zkCallbackTime;
+      StringBuilder sb = new StringBuilder();
+      if (notificationContext != null) {
+        zkCallbackTime = notificationContext.getCreationTime();
+        if (_isMonitoring) {
           _clusterStatusMonitor
               .updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(),
                   enqueueTime - zkCallbackTime);
-          sb.append(String.format(
-              "Callback time for event: " + event.getEventType() + " took: " + (enqueueTime
-                  - zkCallbackTime) + " ms\n"));
-
         }
+        sb.append(String.format(
+            "Callback time for event: " + event.getEventType() + " took: " + (enqueueTime
+                - zkCallbackTime) + " ms\n"));
+      }
+      if (_isMonitoring) {
         _clusterStatusMonitor
             .updateClusterEventDuration(ClusterEventMonitor.PhaseName.InQueue.name(),
                 startTime - enqueueTime);
         _clusterStatusMonitor
             .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
                 endTime - startTime);
-        sb.append(String.format(
-            "InQueue time for event: " + event.getEventType() + " took: " + (startTime
-                - enqueueTime) + " ms\n"));
-        sb.append(String.format(
-            "TotalProcessed time for event: " + event.getEventType() + " took: " + (endTime
-                - startTime) + " ms"));
-        logger.info(sb.toString());
       }
-    }
+      sb.append(String.format(
+          "InQueue time for event: " + event.getEventType() + " took: " + (startTime - enqueueTime)
+              + " ms\n"));
+      sb.append(String.format(
+          "TotalProcessed time for event: " + event.getEventType() + " took: " + (endTime
+              - startTime) + " ms"));
+      logger.info(sb.toString());
+    } else if (_isMonitoring) {
+      // report workflow status
+      TaskDriver driver = new TaskDriver(manager);
+      _clusterStatusMonitor.refreshWorkflowsStatus(driver);
+      _clusterStatusMonitor.refreshJobsStatus(driver);
+    }
+
+    // If event handling happens before controller deactivate, the process may write unnecessary
+    // MBeans to monitoring after the monitor is disabled.
+    // So reset ClusterStatusMonitor according to it's status after all event handling.
+    // TODO remove this once clusterStatusMonitor blocks any MBean register on isMonitoring = false.
+    resetClusterStatusMonitor();
   }
 
-
   @Override
   @PreFetch(enabled = false)
   public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -457,7 +428,8 @@ public class GenericHelixController implements IdealStateChangeListener,
     notifyCaches(changeContext, ChangeType.MESSAGE);
     pushToEventQueues(ClusterEventType.MessageChange, changeContext,
         Collections.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
-    if (_clusterStatusMonitor != null && messages != null) {
+
+    if (_isMonitoring && messages != null) {
       _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
     }
 
@@ -608,42 +580,36 @@ public class GenericHelixController implements IdealStateChangeListener,
   @Override
   public void onControllerChange(NotificationContext changeContext) {
     logger.info("START: GenericClusterController.onControllerChange() for cluster " + _clusterName);
+
     _cache.requireFullRefresh();
     _taskCache.requireFullRefresh();
-    if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
-      logger.info("GenericClusterController.onControllerChange() FINALIZE for cluster " + _clusterName);
-      return;
-    }
-    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
 
-    // double check if this controller is the leader
-    Builder keyBuilder = accessor.keyBuilder();
-    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
-    if (leader == null) {
-      logger
-          .warn("No controller exists for cluster:" + changeContext.getManager().getClusterName());
-      return;
-    } else {
-      String leaderName = leader.getInstanceName();
+    boolean controllerIsLeader;
 
-      String instanceName = changeContext.getManager().getInstanceName();
-      if (leaderName == null || !leaderName.equals(instanceName)) {
-        logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: " + leader);
-        return;
-      }
+    if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
+      logger.info(
+          "GenericClusterController.onControllerChange() FINALIZE for cluster " + _clusterName);
+      controllerIsLeader = false;
+    } else {
+      // double check if this controller is the leader
+      controllerIsLeader = changeContext.getManager().isLeader();
+    }
+
+    HelixManager manager = changeContext.getManager();
+    if (controllerIsLeader) {
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      Builder keyBuilder = accessor.keyBuilder();
+      PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
+      MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
+      _paused = updateControllerState(changeContext, pauseSignal, _paused);
+      _inMaintenanceMode =
+          updateControllerState(changeContext, maintenanceSignal, _inMaintenanceMode);
+      enableClusterStatusMonitor(true);
+      _clusterStatusMonitor.setEnabled(!_paused);
+    } else {
+      enableClusterStatusMonitor(false);
     }
 
-    PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
-    MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
-    _paused = updateControllerState(changeContext, pauseSignal, _paused);
-    _inMaintenanceMode = updateControllerState(changeContext, maintenanceSignal, _inMaintenanceMode);
-
-    synchronized (this) {
-      if (_clusterStatusMonitor == null) {
-        _clusterStatusMonitor = new ClusterStatusMonitor(changeContext.getManager().getClusterName());
-      }
-    }
-    _clusterStatusMonitor.setEnabled(!_paused);
     logger.info("END: GenericClusterController.onControllerChange() for cluster " + _clusterName);
   }
 
@@ -656,7 +622,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       NotificationContext changeContext) {
 
     // construct maps for current live-instances
-    Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>();
+    Map<String, LiveInstance> curInstances = new HashMap<>();
     Map<String, LiveInstance> curSessions = new HashMap<>();
     for (LiveInstance liveInstance : liveInstances) {
       curInstances.put(liveInstance.getInstanceName(), liveInstance);
@@ -723,21 +689,51 @@ public class GenericHelixController implements IdealStateChangeListener,
     }
   }
 
-  public void shutdownClusterStatusMonitor(String clusterName) {
-    if (_clusterStatusMonitor != null) {
-      logger.info("Shut down _clusterStatusMonitor for cluster " + clusterName);
-      _clusterStatusMonitor.reset();
-      _clusterStatusMonitor = null;
-    }
-  }
-
   public void shutdown() throws InterruptedException {
     stopRebalancingTimer();
 
     terminateEventThread(_eventThread);
     terminateEventThread(_taskEventThread);
 
-    _asyncTasksThreadPool.shutdown();
+    // shutdown asycTasksThreadpool and wait for terminate.
+    _asyncTasksThreadPool.shutdownNow();
+    try {
+      _asyncTasksThreadPool.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ex) {
+      logger.warn("Timeout when terminating async tasks. Some async tasks are still executing.");
+    }
+
+    enableClusterStatusMonitor(false);
+
+    // TODO controller shouldn't be used in anyway after shutdown.
+    // Need to record shutdown and throw Exception if the controller is used again.
+  }
+
+  private void enableClusterStatusMonitor(boolean enable) {
+    synchronized (_clusterStatusMonitor) {
+      if (_isMonitoring != enable) {
+        // monitoring state changed
+        if (enable) {
+          logger.info("Enable clusterStatusMonitor for cluster " + _clusterName);
+          _clusterStatusMonitor.active();
+        } else {
+          logger.info("Disable clusterStatusMonitor for cluster " + _clusterName);
+          // Reset will be done if (_isMonitoring = false) later, no matter if the state is changed or not.
+        }
+        _isMonitoring = enable;
+      }
+      // Due to multithreads processing, async thread may write to monitor even it is closed.
+      // So when it is disabled, always try to clear the monitor.
+      resetClusterStatusMonitor();
+    }
+  }
+
+  private void resetClusterStatusMonitor() {
+    synchronized (_clusterStatusMonitor) {
+      if (!_isMonitoring) {
+        _clusterStatusMonitor.reset();
+      }
+    }
   }
 
   private void terminateEventThread(Thread thread) throws InterruptedException {
@@ -784,12 +780,12 @@ public class GenericHelixController implements IdealStateChangeListener,
       _eventBlockingQueue = eventBlockingQueue;
     }
 
-    @Override public void run() {
+    @Override
+    public void run() {
       logger.info("START ClusterEventProcessor thread  for cluster " + _clusterName);
       while (!isInterrupted()) {
         try {
-          ClusterEvent event = _eventBlockingQueue.take();
-          handleEvent(event, _cache);
+          handleEvent(_eventBlockingQueue.take(), _cache);
         } catch (InterruptedException e) {
           logger.warn("ClusterEventProcessor interrupted", e);
           interrupt();

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 3c2c977..f8f5a2f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -219,8 +219,9 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         long startTime = missingTopStateMap.get(resourceName).get(partitionName);
         if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold) {
           missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
-          clusterStatusMonitor
-              .updateMissingTopStateDurationStats(resourceName, 0L, false);
+          if (clusterStatusMonitor != null) {
+            clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
+          }
         }
       }
     }
@@ -294,8 +295,10 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     if (handOffStartTime != TRANSITION_FAILED && handOffEndTime - handOffStartTime <= threshold) {
       LOG.info(String.format("Missing topstate duration is %d for partition %s",
           handOffEndTime - handOffStartTime, partition.getPartitionName()));
-      clusterStatusMonitor.updateMissingTopStateDurationStats(resource.getResourceName(),
-          handOffEndTime - handOffStartTime, true);
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor.updateMissingTopStateDurationStats(resource.getResourceName(),
+            handOffEndTime - handOffStartTime, true);
+      }
     }
     removeFromStatsMap(missingTopStateMap, resource, partition);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 8d1a48c..9e91ba5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -19,39 +19,20 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixManager;
-import org.apache.helix.PropertyKey;
+import org.apache.helix.*;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
-import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
+import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+
 public class ExternalViewComputeStage extends AbstractBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class);
 
@@ -106,22 +87,24 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       // Update cluster status monitor mbean
       IdealState idealState = cache.getIdealState(resourceName);
       if (!cache.isTaskCache()) {
+        ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
         ClusterStatusMonitor clusterStatusMonitor =
             event.getAttribute(AttributeName.clusterStatusMonitor.name());
-        ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
-        if (idealState != null && (resourceConfig == null || !resourceConfig
-            .isMonitoringDisabled())) {
-          if (clusterStatusMonitor != null && !idealState.getStateModelDefRef()
-              .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-            StateModelDefinition stateModelDef =
-                cache.getStateModelDef(idealState.getStateModelDefRef());
-            clusterStatusMonitor
-                .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
-                    stateModelDef);
+        if (clusterStatusMonitor != null) {
+          if (idealState != null && (resourceConfig == null || !resourceConfig
+              .isMonitoringDisabled())) {
+            if (!idealState.getStateModelDefRef()
+                .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+              StateModelDefinition stateModelDef =
+                  cache.getStateModelDef(idealState.getStateModelDefRef());
+              clusterStatusMonitor
+                  .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
+                      stateModelDef);
+            }
+          } else {
+            // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true.
+            clusterStatusMonitor.unregisterResource(view.getResourceName());
           }
-        } else {
-          // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true.
-          clusterStatusMonitor.unregisterResource(view.getResourceName());
         }
       }
       ExternalView curExtView = curExtViews.get(resourceName);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index ed6ede2..d8ccd0f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -77,7 +77,9 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     if (!cache.isTaskCache()) {
       ClusterStatusMonitor clusterStatusMonitor =
           event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      clusterStatusMonitor.increaseMessageReceived(outputMessages);
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor.increaseMessageReceived(outputMessages);
+      }
     }
     long cacheStart = System.currentTimeMillis();
     cache.cacheMessages(outputMessages);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index edc467b..6022edd 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -22,7 +22,6 @@ package org.apache.helix.manager.zk;
 import java.lang.management.ManagementFactory;
 import java.util.List;
 
-import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixTimerTask;
@@ -30,6 +29,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyType;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.model.LeaderHistory;
 import org.apache.helix.model.LiveInstance;
@@ -97,11 +97,10 @@ public class DistributedLeaderElection implements ControllerChangeListener {
           }
         }
       } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
-        LOG.info(_manager.getInstanceName() + " reqlinquish leadership for cluster: "
+        LOG.info(_manager.getInstanceName() + " relinquish leadership for cluster: "
             + _manager.getClusterName());
         controllerHelper.stopControllerTimerTasks();
         controllerHelper.removeListenersFromController(_controller);
-        _controller.shutdownClusterStatusMonitor(_manager.getClusterName());
 
         /**
          * clear write-through cache

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 2a99341..fe682ac 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -85,11 +85,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
     _beanServer = ManagementFactory.getPlatformMBeanServer();
-    try {
-      register(this, getObjectName(clusterBeanName()));
-    } catch (Exception e) {
-      LOG.error("Fail to regiter ClusterStatusMonitor", e);
-    }
   }
 
   public ObjectName getObjectName(String name) throws MalformedObjectNameException {
@@ -275,20 +270,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) {
-    if (!_clusterEventMbeanMap.containsKey(phase)) {
-      ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase);
-      try {
-        ClusterEventMonitor prevEventMbean = _clusterEventMbeanMap.put(phase, monitor);
-        if (prevEventMbean != null) {
-          prevEventMbean.unregister();
+    try {
+      if (!_clusterEventMbeanMap.containsKey(phase)) {
+        synchronized (this) {
+          if (!_clusterEventMbeanMap.containsKey(phase)) {
+            ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase);
+            monitor.register();
+            _clusterEventMbeanMap.put(phase, monitor);
+          }
         }
-        monitor.register();
-      } catch (JMException e) {
-        LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + _clusterName
-            + " and phase type: " + phase, e);
-        return null;
       }
+    } catch (JMException e) {
+      LOG.error("Failed to register ClusterEventMonitorMbean for cluster " + _clusterName
+          + " and phase type: " + phase, e);
     }
+
     return _clusterEventMbeanMap.get(phase);
   }
 
@@ -477,6 +473,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _instanceMsgQueueSizes.put(instanceName, msgQueueSize);
   }
 
+  public void active() {
+    LOG.info("Active ClusterStatusMonitor");
+    try {
+      register(this, getObjectName(clusterBeanName()));
+    } catch (Exception e) {
+      LOG.error("Fail to register ClusterStatusMonitor", e);
+    }
+  }
+
   public void reset() {
     LOG.info("Reset ClusterStatusMonitor");
     try {

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 3d3f86e..19a4049 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -78,13 +78,16 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
         failedJobs ++;
         if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
           ctx.setWorkflowState(TaskState.FAILED);
-          _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED);
+          if (_clusterStatusMonitor != null) {
+            _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.FAILED);
+          }
           for (String jobToFail : cfg.getJobDag().getAllNodes()) {
             if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
               ctx.setJobState(jobToFail, TaskState.ABORTED);
               // Skip aborted jobs latency since they are not accurate latency for job running time
-              _clusterStatusMonitor
-                  .updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED);
+              if (_clusterStatusMonitor != null) {
+                _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobToFail), TaskState.ABORTED);
+              }
             }
           }
           return true;
@@ -98,8 +101,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
 
     if (!incomplete && cfg.isTerminable()) {
       ctx.setWorkflowState(TaskState.COMPLETED);
-      _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED,
-          ctx.getFinishTime() - ctx.getStartTime());
+      if (_clusterStatusMonitor != null) {
+        _clusterStatusMonitor.updateWorkflowCounters(cfg, TaskState.COMPLETED, ctx.getFinishTime() - ctx.getStartTime());
+      }
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
index b607add..69b553c 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
@@ -47,6 +47,7 @@ public class TestClusterEventStatusMonitor {
   private class ClusterStatusMonitorForTest extends ClusterStatusMonitor {
     public ClusterStatusMonitorForTest(String clusterName) {
       super(clusterName);
+      active();
     }
     public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMBean() {
       return _clusterEventMbeanMap;

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 3db8237..ccaba4e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -19,14 +19,6 @@ package org.apache.helix.monitoring;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.Date;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerNotification;
-import javax.management.MalformedObjectNameException;
-
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
@@ -34,7 +26,6 @@ import org.apache.helix.integration.manager.ClusterDistributedController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
@@ -45,8 +36,16 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import javax.management.*;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
 public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
-  private static final Logger LOG = LoggerFactory.getLogger(TestClusterStatusMonitorLifecycle.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestClusterStatusMonitorLifecycle.class);
 
   MockParticipantManager[] _participants;
   ClusterDistributedController[] _controllers;
@@ -60,11 +59,10 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
   @BeforeClass
   public void beforeClass() throws Exception {
     String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
-    _clusterNamePrefix = className + "_" + methodName;
+    _clusterNamePrefix = className;
 
-    System.out.println("START " + _clusterNamePrefix + " at "
-        + new Date(System.currentTimeMillis()));
+    System.out
+        .println("START " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
 
     // setup 10 clusters
     for (int i = 0; i < clusterNb; i++) {
@@ -83,8 +81,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     // setup controller cluster
     _controllerClusterName = "CONTROLLER_" + _clusterNamePrefix;
-    TestHelper.setupCluster("CONTROLLER_" + _clusterNamePrefix, ZK_ADDR, 0, // controller
-                                                                            // port
+    TestHelper.setupCluster(_controllerClusterName, ZK_ADDR, // controller
+        0, // port
         "controller", // participant name prefix
         _clusterNamePrefix, // resource name prefix
         1, // resources
@@ -101,10 +99,9 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
       _controllers[i].syncStart();
     }
 
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(
-            new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName),
-            30000);
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName),
+        30000);
     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
 
     // start first cluster
@@ -116,9 +113,8 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
       _participants[i].syncStart();
     }
 
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            _firstClusterName));
+    result = ClusterStateVerifier
+        .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _firstClusterName));
     Assert.assertTrue(result, "first cluster NOT in ideal state");
 
     // add more controllers to controller cluster
@@ -135,32 +131,23 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     }
 
     // verify controller cluster
-    result =
-        ClusterStateVerifier
-            .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
-                _controllerClusterName));
+    result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _controllerClusterName));
     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
 
     // verify first cluster
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
-            _firstClusterName));
+    result = ClusterStateVerifier
+        .verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, _firstClusterName));
     Assert.assertTrue(result, "first cluster NOT in ideal state");
   }
 
   @AfterClass
   public void afterClass() {
     System.out.println("Cleaning up...");
-    for (int i = 0; i < 5; i++) {
-      _controllers[i].syncStop();
-    }
-
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < _participants.length; i++) {
       _participants[i].syncStop();
     }
-
     System.out.println("END " + _clusterNamePrefix + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   class ParticipantMonitorListener extends ClusterMBeanObserver {
@@ -168,8 +155,9 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     int _nMbeansUnregistered = 0;
     int _nMbeansRegistered = 0;
 
-    public ParticipantMonitorListener(String domain) throws InstanceNotFoundException, IOException,
-        MalformedObjectNameException, NullPointerException {
+    public ParticipantMonitorListener(String domain)
+        throws InstanceNotFoundException, IOException, MalformedObjectNameException,
+        NullPointerException {
       super(domain);
     }
 
@@ -188,14 +176,21 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     }
   }
 
-  @Test (enabled = false)
-  public void testClusterStatusMonitorLifecycle() throws InstanceNotFoundException,
-      MalformedObjectNameException, NullPointerException, IOException, InterruptedException {
-    ParticipantMonitorListener listener =
-        new ParticipantMonitorListener(MonitorDomainNames.ClusterStatus.name());
+  private void cleanupControllers() {
+    for (int i = 0; i < _controllers.length; i++) {
+      _controllers[i].syncStop();
+    }
+  }
 
-    int nMbeansUnregistered = listener._nMbeansUnregistered;
-    int nMbeansRegistered = listener._nMbeansRegistered;
+  @Test
+  public void testClusterStatusMonitorLifecycle()
+      throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException,
+      IOException, InterruptedException {
+    // Filter other unrelated clusters' metrics
+    QueryExp exp =
+        Query.match(Query.attr("SensorName"), Query.value("*" + _clusterNamePrefix + "*"));
+    Set<ObjectInstance> mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
 
     _participants[0].disconnect();
 
@@ -203,8 +198,10 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     // No change in instance/resource mbean
     // Unregister 1 per-instance resource mbean and message queue mbean
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 2);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
+    int previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount - 2);
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -217,13 +214,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
       }
     }
     firstController.disconnect();
-    Thread.sleep(2000);
 
-    // 1 cluster status monitor, 1 resource monitor, 5 instances
-    // Unregister 1+4+1 per-instance resource mbean
-    // Register 4 per-instance resource mbean
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 33);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 29);
+    // 1 controller goes away
+    // 1 message queue mbean, 1 PerInstanceResource mbean, and one message queue mbean
+    Thread.sleep(2000);
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount - 3);
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
@@ -231,10 +229,12 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     // 1 participant comes back
     // No change in instance/resource mbean
-    // Register 1 per-instance resource mbean
+    // Register 1 per-instance resource mbean and 1 message queue mbean
     Thread.sleep(2000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 33);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 31);
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount + 2);
 
     // Add a resource
     // Register 1 resource mbean
@@ -248,15 +248,37 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
         Integer.parseInt(idealState.getReplicas()));
 
     Thread.sleep(2000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 33);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 37);
+    // Add one resource, PerInstanceResource mbeans and 1 resource monitor
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount + _participants.length + 1);
 
     // Remove a resource
     // No change in instance/resource mbean
     // Unregister 5 per-instance resource mbean
     setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
     Thread.sleep(2000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 39);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 37);
+    previousMBeanCount = mbeans.size();
+    mbeans = new HashSet<>(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    Assert.assertEquals(mbeans.size(), previousMBeanCount - (_participants.length + 1));
+
+    // Cleanup controllers then MBeans should all be removed.
+    cleanupControllers();
+    Thread.sleep(2000);
+
+    // Check if any MBeans leftover.
+    // Note that MessageQueueStatus is not bound with controller only. So it will still exist.
+    exp = Query
+        .and(Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))),
+            exp);
+    if (!ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp).isEmpty()) {
+      System.out.println(ManagementFactory.getPlatformMBeanServer()
+          .queryMBeans(new ObjectName("ClusterStatus:*"), exp));
+    }
+    Assert.assertTrue(ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp).isEmpty());
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 723d969..755997b 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -54,6 +54,7 @@ public class TestClusterStatusMonitor {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    monitor.active();
     ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
     try {
       _server.getMBeanInfo(clusterMonitorObjName);

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index df88397..61acdb5 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -66,7 +66,9 @@ public class TestRebalancerMetrics extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor(_clusterName));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
 
     runStage(event, new ReadClusterDataStage());
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
@@ -110,7 +112,9 @@ public class TestRebalancerMetrics extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
-    event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor(_clusterName));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
 
     runStage(event, new ReadClusterDataStage());
     runStage(event, new BestPossibleStateCalcStage());

http://git-wip-us.apache.org/repos/asf/helix/blob/b6fd8cb8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index 51b048d..7b97ac3 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -54,7 +54,9 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     resource.setStateModelDefRef("MasterSlave");
     resource.addPartition(PARTITION);
     event.addAttribute(AttributeName.RESOURCES.name(), Collections.singletonMap(TEST_RESOURCE, resource));
-    event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor("TestCluster"));
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
+    monitor.active();
+    event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
   }
 
   @Test(dataProvider = "successCurrentStateInput")