You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/01/30 19:28:34 UTC

[2/2] helix git commit: Fix some race condition issues in MBean management classes. Fix the listener adding sequence.

Fix some race condition issues in MBean management classes. Fix the listener adding sequence.

Stabilized the TestClusterStatusMonitorLifecycle accordingly.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f9ec24e9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f9ec24e9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f9ec24e9

Branch: refs/heads/master
Commit: f9ec24e994afbf834a4454dcb2cbe48af71e24dd
Parents: 0e0fcb9
Author: jiajunwang <er...@gmail.com>
Authored: Tue Jan 29 17:33:31 2019 -0800
Committer: Jiajun Wang <jj...@jjwang-ld2.linkedin.biz>
Committed: Wed Jan 30 11:26:21 2019 -0800

----------------------------------------------------------------------
 .../helix/controller/HelixControllerMain.java   |   2 +-
 .../manager/zk/ControllerManagerHelper.java     |   9 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java | 406 +++++++++++--------
 .../manager/TestConsecutiveZkSessionExpiry.java |   4 +-
 .../TestDistributedControllerManager.java       |   2 +-
 .../TestClusterEventStatusMonitor.java          |   8 +-
 .../TestClusterStatusMonitorLifecycle.java      |  19 +-
 7 files changed, 256 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
index 5265c24..d8e5a3f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HelixControllerMain.java
@@ -133,12 +133,12 @@ public class HelixControllerMain {
   public static void addListenersToController(HelixManager manager,
       GenericHelixController controller) {
     try {
+      manager.addControllerListener(controller);
       manager.addInstanceConfigChangeListener(controller);
       manager.addResourceConfigChangeListener(controller);
       manager.addClusterfigChangeListener(controller);
       manager.addLiveInstanceChangeListener(controller);
       manager.addIdealStateChangeListener(controller);
-      manager.addControllerListener(controller);
     } catch (ZkInterruptedException e) {
       logger
           .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index 1df53ec..b8202d9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -79,12 +79,12 @@ public class ControllerManagerHelper {
       /**
        * setup generic-controller
        */
+      _manager.addControllerListener(controller);
       _manager.addInstanceConfigChangeListener(controller);
       _manager.addResourceConfigChangeListener(controller);
       _manager.addClusterfigChangeListener(controller);
       _manager.addLiveInstanceChangeListener(controller);
       _manager.addIdealStateChangeListener(controller);
-      _manager.addControllerListener(controller);
     } catch (ZkInterruptedException e) {
       LOG.warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
           + e);
@@ -98,10 +98,11 @@ public class ControllerManagerHelper {
     /**
      * reset generic-controller
      */
-    _manager.removeListener(keyBuilder.instanceConfigs(), controller);
-    _manager.removeListener(keyBuilder.resourceConfigs(), controller);
-    _manager.removeListener(keyBuilder.liveInstances(), controller);
     _manager.removeListener(keyBuilder.idealStates(), controller);
+    _manager.removeListener(keyBuilder.liveInstances(), controller);
+    _manager.removeListener(keyBuilder.clusterConfig(), controller);
+    _manager.removeListener(keyBuilder.resourceConfigs(), controller);
+    _manager.removeListener(keyBuilder.instanceConfigs(), controller);
     _manager.removeListener(keyBuilder.controller(), controller);
 
     /**

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index a980d3c..1455032 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -86,18 +87,20 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
 
-  private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ResourceMonitor> _resourceMonitorMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, InstanceMonitor> _instanceMonitorMap =
+      new ConcurrentHashMap<>();
 
   // phaseName -> eventMonitor
-  protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMbeanMap =
+  protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMonitorMap =
       new ConcurrentHashMap<>();
 
   /**
-   * PerInstanceResource bean map: beanName->bean
+   * PerInstanceResource monitor map: beanName->monitor
    */
-  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap =
-      new ConcurrentHashMap<>();
+  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>
+      _perInstanceResourceMonitorMap = new ConcurrentHashMap<>();
 
   private final Map<String, WorkflowMonitor> _perTypeWorkflowMonitorMap = new ConcurrentHashMap<>();
 
@@ -227,60 +230,60 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    * @param disabledPartitions a map of instance name to the set of partitions disabled on it
    * @param tags a map of instance name to the set of tags on it
    */
-  public synchronized void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> instanceSet,
+  public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> instanceSet,
       Set<String> disabledInstanceSet, Map<String, Map<String, List<String>>> disabledPartitions,
       Map<String, List<String>> oldDisabledPartitions, Map<String, Set<String>> tags) {
-    // Unregister beans for instances that are no longer configured
-    Set<String> toUnregister = Sets.newHashSet(_instanceMbeanMap.keySet());
-    toUnregister.removeAll(instanceSet);
-    try {
-      unregisterInstances(toUnregister);
-    } catch (MalformedObjectNameException e) {
-      LOG.error("Could not unregister instances from MBean server: " + toUnregister, e);
-    }
-
-    // Register beans for instances that are newly configured
-    Set<String> toRegister = Sets.newHashSet(instanceSet);
-    toRegister.removeAll(_instanceMbeanMap.keySet());
-    Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet();
-    for (String instanceName : toRegister) {
-      InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
-      bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
-          oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
-          !disabledInstanceSet.contains(instanceName));
-      monitorsToRegister.add(bean);
-    }
-    try {
-      registerInstances(monitorsToRegister);
-    } catch (MalformedObjectNameException e) {
-      LOG.error("Could not register instances with MBean server: " + toRegister, e);
-    }
-
-    // Update all the sets
-    _instances = instanceSet;
-    _liveInstances = liveInstanceSet;
-    _disabledInstances = disabledInstanceSet;
-    _disabledPartitions = disabledPartitions;
-    _oldDisabledPartitions = oldDisabledPartitions;
-
-    // Update the instance MBeans
-    for (String instanceName : instanceSet) {
-      if (_instanceMbeanMap.containsKey(instanceName)) {
-        // Update the bean
-        InstanceMonitor bean = _instanceMbeanMap.get(instanceName);
-        String oldSensorName = bean.getSensorName();
+    synchronized (_instanceMonitorMap) {
+      // Unregister beans for instances that are no longer configured
+      Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
+      toUnregister.removeAll(instanceSet);
+      try {
+        unregisterInstances(toUnregister);
+      } catch (MalformedObjectNameException e) {
+        LOG.error("Could not unregister instances from MBean server: " + toUnregister, e);
+      }
+
+      // Register beans for instances that are newly configured
+      Set<String> toRegister = Sets.newHashSet(instanceSet);
+      toRegister.removeAll(_instanceMonitorMap.keySet());
+      Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet();
+      for (String instanceName : toRegister) {
+        InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
         bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
-            oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
-            !disabledInstanceSet.contains(instanceName));
-
-        // If the sensor name changed, re-register the bean so that listeners won't miss it
-        String newSensorName = bean.getSensorName();
-        if (!oldSensorName.equals(newSensorName)) {
-          try {
-            unregisterInstances(Arrays.asList(instanceName));
-            registerInstances(Arrays.asList(bean));
-          } catch (MalformedObjectNameException e) {
-            LOG.error("Could not refresh registration with MBean server: " + instanceName, e);
+            oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName));
+        monitorsToRegister.add(bean);
+      }
+      try {
+        registerInstances(monitorsToRegister);
+      } catch (MalformedObjectNameException e) {
+        LOG.error("Could not register instances with MBean server: " + toRegister, e);
+      }
+
+      // Update all the sets
+      _instances = instanceSet;
+      _liveInstances = liveInstanceSet;
+      _disabledInstances = disabledInstanceSet;
+      _disabledPartitions = disabledPartitions;
+      _oldDisabledPartitions = oldDisabledPartitions;
+
+      // Update the instance MBeans
+      for (String instanceName : instanceSet) {
+        if (_instanceMonitorMap.containsKey(instanceName)) {
+          // Update the bean
+          InstanceMonitor bean = _instanceMonitorMap.get(instanceName);
+          String oldSensorName = bean.getSensorName();
+          bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
+              oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName), !disabledInstanceSet.contains(instanceName));
+
+          // If the sensor name changed, re-register the bean so that listeners won't miss it
+          String newSensorName = bean.getSensorName();
+          if (!oldSensorName.equals(newSensorName)) {
+            try {
+              unregisterInstances(Arrays.asList(instanceName));
+              registerInstances(Arrays.asList(bean));
+            } catch (MalformedObjectNameException e) {
+              LOG.error("Could not refresh registration with MBean server: " + instanceName, e);
+            }
           }
         }
       }
@@ -302,12 +305,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
   private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) {
     try {
-      if (!_clusterEventMbeanMap.containsKey(phase)) {
-        synchronized (this) {
-          if (!_clusterEventMbeanMap.containsKey(phase)) {
+      if (!_clusterEventMonitorMap.containsKey(phase)) {
+        synchronized (_clusterEventMonitorMap) {
+          if (!_clusterEventMonitorMap.containsKey(phase)) {
             ClusterEventMonitor monitor = new ClusterEventMonitor(this, phase);
             monitor.register();
-            _clusterEventMbeanMap.put(phase, monitor);
+            _clusterEventMonitorMap.put(phase, monitor);
           }
         }
       }
@@ -316,7 +319,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
           + " and phase type: " + phase, e);
     }
 
-    return _clusterEventMbeanMap.get(phase);
+    return _clusterEventMonitorMap.get(phase);
   }
 
   /**
@@ -349,13 +352,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
     // Update message count per instance and per resource
     for (String instance : messageCountPerInstance.keySet()) {
-      if (_instanceMbeanMap.containsKey(instance)) {
-        _instanceMbeanMap.get(instance).increaseMessageCount(messageCountPerInstance.get(instance));
+      InstanceMonitor instanceMonitor = _instanceMonitorMap.get(instance);
+      if (instanceMonitor != null) {
+        instanceMonitor.increaseMessageCount(messageCountPerInstance.get(instance));
       }
     }
     for (String resource : messageCountPerResource.keySet()) {
-      if (_resourceMbeanMap.containsKey(resource)) {
-        _resourceMbeanMap.get(resource).increaseMessageCount(messageCountPerResource.get(resource));
+      ResourceMonitor resourceMonitor = _resourceMonitorMap.get(resource);
+      if (resourceMonitor != null) {
+        resourceMonitor.increaseMessageCount(messageCountPerResource.get(resource));
       }
     }
   }
@@ -389,9 +394,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
         }
       }
     }
-    synchronized (_perInstanceResourceMap) {
+    synchronized (_perInstanceResourceMonitorMap) {
       // Unregister beans for per-instance resources that no longer exist
-      Set<PerInstanceResourceMonitor.BeanName> toUnregister = Sets.newHashSet(_perInstanceResourceMap.keySet());
+      Set<PerInstanceResourceMonitor.BeanName> toUnregister = Sets.newHashSet(
+          _perInstanceResourceMonitorMap.keySet());
       toUnregister.removeAll(beanMap.keySet());
       try {
         unregisterPerInstanceResources(toUnregister);
@@ -400,7 +406,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       }
       // Register beans for per-instance resources that are newly configured
       Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
-      toRegister.removeAll(_perInstanceResourceMap.keySet());
+      toRegister.removeAll(_perInstanceResourceMonitorMap.keySet());
       Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
       for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
         PerInstanceResourceMonitor bean =
@@ -416,8 +422,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
         LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
       }
       // Update existing beans
-      for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet()) {
-        PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+      for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMonitorMap.keySet()) {
+        PerInstanceResourceMonitor bean = _perInstanceResourceMonitorMap.get(beanName);
         String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
         InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
         bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()), stateModelDefMap.get(stateModelDefName));
@@ -431,9 +437,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    */
   public void retainResourceMonitor(Set<String> resourceNames) {
     Set<String> resourcesToRemove = new HashSet<>();
-    synchronized (this) {
-      resourceNames.retainAll(_resourceMbeanMap.keySet());
-      resourcesToRemove.addAll(_resourceMbeanMap.keySet());
+    synchronized (_resourceMonitorMap) {
+      resourceNames.retainAll(_resourceMonitorMap.keySet());
+      resourcesToRemove.addAll(_resourceMonitorMap.keySet());
     }
     resourcesToRemove.removeAll(resourceNames);
 
@@ -446,7 +452,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
     try {
       unregisterResources(resourcesToRemove);
-    } catch (MalformedObjectNameException e) {
+    } catch (Exception e) {
       LOG.error(String.format("Could not unregister beans for the following resources: %s",
           Joiner.on(',').join(resourcesToRemove)), e);
     }
@@ -466,7 +472,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public synchronized void updateMissingTopStateDurationStats(String resourceName,
+  public void updateMissingTopStateDurationStats(String resourceName,
       long totalDuration, long helixLatency, boolean isGraceful, boolean succeeded) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
@@ -490,13 +496,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
   private ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
     try {
-      if (!_resourceMbeanMap.containsKey(resourceName)) {
-        synchronized (this) {
-          if (!_resourceMbeanMap.containsKey(resourceName)) {
+      if (!_resourceMonitorMap.containsKey(resourceName)) {
+        synchronized (_resourceMonitorMap) {
+          if (!_resourceMonitorMap.containsKey(resourceName)) {
             String beanName = getResourceBeanName(resourceName);
             ResourceMonitor bean =
                 new ResourceMonitor(_clusterName, resourceName, getObjectName(beanName));
-            _resourceMbeanMap.put(resourceName, bean);
+            _resourceMonitorMap.put(resourceName, bean);
           }
         }
       }
@@ -504,11 +510,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       LOG.error("Fail to register resource mbean, resource: " + resourceName);
     }
 
-    return _resourceMbeanMap.get(resourceName);
+    return _resourceMonitorMap.get(resourceName);
   }
 
   public void resetMaxMissingTopStateGauge() {
-    for (ResourceMonitor monitor : _resourceMbeanMap.values()) {
+    for (ResourceMonitor monitor : _resourceMonitorMap.values()) {
       monitor.resetMaxTopStateHandoffGauge();
     }
   }
@@ -529,19 +535,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   public void reset() {
     LOG.info("Reset ClusterStatusMonitor");
     try {
-      unregisterResources(_resourceMbeanMap.keySet());
-
-      _resourceMbeanMap.clear();
+      unregisterAllResources();
       _instanceMsgQueueSizes.clear();
-
-      unregisterInstances(_instanceMbeanMap.keySet());
-      _instanceMbeanMap.clear();
-
-      unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+      unregisterAllInstances();
+      unregisterAllPerInstanceResources();
       unregister(getObjectName(clusterBeanName()));
-      unregisterEventMonitors(_clusterEventMbeanMap.values());
-      unregisterWorkflows(_perTypeWorkflowMonitorMap.keySet());
-      unregisterJobs(_perTypeJobMonitorMap.keySet());
+      unregisterAllEventMonitors();
+      unregisterAllWorkflowsMonitor();
+      unregisterAllJobs();
 
       _rebalanceFailure = false;
     } catch (Exception e) {
@@ -550,8 +551,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   public void refreshWorkflowsStatus(TaskDriver driver) {
-    for (WorkflowMonitor workflowMonitor : _perTypeWorkflowMonitorMap.values()) {
-      workflowMonitor.resetGauges();
+    for (Map.Entry<String, WorkflowMonitor> workflowMonitor : _perTypeWorkflowMonitorMap
+        .entrySet()) {
+      workflowMonitor.getValue().resetGauges();
     }
 
     Map<String, WorkflowConfig> workflowConfigMap = driver.getWorkflows();
@@ -571,13 +573,19 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   public void updateWorkflowCounters(WorkflowConfig workflowConfig, TaskState to, long latency) {
     String workflowType = workflowConfig.getWorkflowType();
     workflowType = preProcessWorkflow(workflowType);
-    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowCounters(to, latency);
+    WorkflowMonitor workflowMonitor = _perTypeWorkflowMonitorMap.get(workflowType);
+    if (workflowMonitor != null) {
+      workflowMonitor.updateWorkflowCounters(to, latency);
+    }
   }
 
   private void updateWorkflowGauges(WorkflowConfig workflowConfig, TaskState current) {
     String workflowType = workflowConfig.getWorkflowType();
     workflowType = preProcessWorkflow(workflowType);
-    _perTypeWorkflowMonitorMap.get(workflowType).updateWorkflowGauges(current);
+    WorkflowMonitor workflowMonitor = _perTypeWorkflowMonitorMap.get(workflowType);
+    if (workflowMonitor != null) {
+      workflowMonitor.updateWorkflowGauges(current);
+    }
   }
 
   private String preProcessWorkflow(String workflowType) {
@@ -585,21 +593,23 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       workflowType = DEFAULT_WORKFLOW_JOB_TYPE;
     }
 
-    if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) {
-      WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType);
-      try {
-        registerWorkflow(monitor);
-      } catch (MalformedObjectNameException e) {
-        LOG.error("Failed to register object for workflow type : " + workflowType, e);
+    synchronized (_perTypeWorkflowMonitorMap) {
+      if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) {
+        WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType);
+        try {
+          registerWorkflow(monitor);
+        } catch (MalformedObjectNameException e) {
+          LOG.error("Failed to register object for workflow type : " + workflowType, e);
+        }
+        _perTypeWorkflowMonitorMap.put(workflowType, monitor);
       }
-      _perTypeWorkflowMonitorMap.put(workflowType, monitor);
     }
     return workflowType;
   }
 
   public void refreshJobsStatus(TaskDriver driver) {
-    for (JobMonitor jobMonitor : _perTypeJobMonitorMap.values()) {
-      jobMonitor.resetJobGauge();
+    for (Map.Entry<String, JobMonitor> jobMonitor : _perTypeJobMonitorMap.entrySet()) {
+      jobMonitor.getValue().resetJobGauge();
     }
     for (String workflow : driver.getWorkflows().keySet()) {
       if (workflow.isEmpty()) {
@@ -626,14 +636,20 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   public void updateJobCounters(JobConfig jobConfig, TaskState to, long latency) {
     String jobType = jobConfig.getJobType();
     jobType = preProcessJobMonitor(jobType);
-    _perTypeJobMonitorMap.get(jobType).updateJobCounters(to, latency);
+    JobMonitor jobMonitor = _perTypeJobMonitorMap.get(jobType);
+    if (jobMonitor != null) {
+      jobMonitor.updateJobCounters(to, latency);
+    }
   }
 
   private void updateJobGauges(String jobType, TaskState current) {
     // When first time for WorkflowRebalancer call, jobconfig may not ready.
     // Thus only check it for gauge.
     jobType = preProcessJobMonitor(jobType);
-    _perTypeJobMonitorMap.get(jobType).updateJobGauge(current);
+    JobMonitor jobMonitor = _perTypeJobMonitorMap.get(jobType);
+    if (jobMonitor != null) {
+      jobMonitor.updateJobGauge(current);
+    }
   }
 
   private String preProcessJobMonitor(String jobType) {
@@ -641,114 +657,154 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       jobType = DEFAULT_WORKFLOW_JOB_TYPE;
     }
 
-    if (!_perTypeJobMonitorMap.containsKey(jobType)) {
-      JobMonitor monitor = new JobMonitor(_clusterName, jobType);
-      try {
-        registerJob(monitor);
-      } catch (MalformedObjectNameException e) {
-        LOG.error("Failed to register job type : " + jobType, e);
+    synchronized (_perTypeJobMonitorMap) {
+      if (!_perTypeJobMonitorMap.containsKey(jobType)) {
+        JobMonitor monitor = new JobMonitor(_clusterName, jobType);
+        try {
+          registerJob(monitor);
+        } catch (MalformedObjectNameException e) {
+          LOG.error("Failed to register job type : " + jobType, e);
+        }
+        _perTypeJobMonitorMap.put(jobType, monitor);
       }
-      _perTypeJobMonitorMap.put(jobType, monitor);
     }
     return jobType;
   }
 
-  private synchronized void registerInstances(Collection<InstanceMonitor> instances)
+  private void registerInstances(Collection<InstanceMonitor> instances)
       throws MalformedObjectNameException {
-    for (InstanceMonitor monitor : instances) {
-      String instanceName = monitor.getInstanceName();
-      String beanName = getInstanceBeanName(instanceName);
-      register(monitor, getObjectName(beanName));
-      _instanceMbeanMap.put(instanceName, monitor);
+    synchronized (_instanceMonitorMap) {
+      for (InstanceMonitor monitor : instances) {
+        String instanceName = monitor.getInstanceName();
+        String beanName = getInstanceBeanName(instanceName);
+        register(monitor, getObjectName(beanName));
+        _instanceMonitorMap.put(instanceName, monitor);
+      }
+    }
+  }
+
+  private void unregisterAllInstances() throws MalformedObjectNameException {
+    synchronized (_instanceMonitorMap) {
+      unregisterInstances(_instanceMonitorMap.keySet());
+    }
+  }
+
+  private void unregisterInstances(Collection<String> instances) throws MalformedObjectNameException {
+    synchronized (_instanceMonitorMap) {
+      for (String instanceName : instances) {
+        String beanName = getInstanceBeanName(instanceName);
+        unregister(getObjectName(beanName));
+      }
+      _instanceMonitorMap.keySet().removeAll(instances);
+    }
+  }
+
+  private void registerResources(Collection<String> resources) throws JMException {
+    synchronized (_resourceMonitorMap) {
+      for (String resourceName : resources) {
+        ResourceMonitor monitor = _resourceMonitorMap.get(resourceName);
+        if (monitor != null) {
+          monitor.register();
+        }
+      }
     }
   }
 
-  private synchronized void unregisterInstances(Collection<String> instances) throws MalformedObjectNameException {
-    for (String instanceName : instances) {
-      String beanName = getInstanceBeanName(instanceName);
-      unregister(getObjectName(beanName));
+  private void unregisterAllResources() {
+    synchronized (_resourceMonitorMap) {
+      unregisterResources(_resourceMonitorMap.keySet());
     }
-    _instanceMbeanMap.keySet().removeAll(instances);
   }
 
-  private synchronized void registerResources(Collection<String> resources) throws JMException {
-    for (String resourceName : resources) {
-      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
-      if (monitor != null) {
-        monitor.register();
+  private void unregisterResources(Collection<String> resources) {
+    synchronized (_resourceMonitorMap) {
+      for (String resourceName : resources) {
+        ResourceMonitor monitor = _resourceMonitorMap.get(resourceName);
+        if (monitor != null) {
+          monitor.unregister();
+        }
       }
+      _resourceMonitorMap.keySet().removeAll(resources);
     }
   }
 
-  private synchronized void unregisterResources(Collection<String> resources) throws MalformedObjectNameException {
-    for (String resourceName : resources) {
-      ResourceMonitor monitor = _resourceMbeanMap.get(resourceName);
-      if (monitor != null) {
+  private void unregisterAllEventMonitors() {
+    synchronized (_clusterEventMonitorMap) {
+      for (ClusterEventMonitor monitor : _clusterEventMonitorMap.values()) {
         monitor.unregister();
       }
+      _clusterEventMonitorMap.clear();
     }
-    _resourceMbeanMap.keySet().removeAll(resources);
   }
 
-  private synchronized void unregisterEventMonitors(Collection<ClusterEventMonitor> monitors)
+  private void registerPerInstanceResources(Collection<PerInstanceResourceMonitor> monitors)
       throws MalformedObjectNameException {
-    for (ClusterEventMonitor monitor : monitors) {
-      monitor.unregister();
+    synchronized (_perInstanceResourceMonitorMap) {
+      for (PerInstanceResourceMonitor monitor : monitors) {
+        String instanceName = monitor.getInstanceName();
+        String resourceName = monitor.getResourceName();
+        String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+        register(monitor, getObjectName(beanName));
+        _perInstanceResourceMonitorMap
+            .put(new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+      }
     }
-    _resourceMbeanMap.keySet().removeAll(monitors);
   }
 
-  private synchronized void registerPerInstanceResources(Collection<PerInstanceResourceMonitor> monitors)
+  private void unregisterAllPerInstanceResources()
       throws MalformedObjectNameException {
-    for (PerInstanceResourceMonitor monitor : monitors) {
-      String instanceName = monitor.getInstanceName();
-      String resourceName = monitor.getResourceName();
-      String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
-      register(monitor, getObjectName(beanName));
-      _perInstanceResourceMap.put(
-          new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+    synchronized (_perInstanceResourceMonitorMap) {
+      unregisterPerInstanceResources(_perInstanceResourceMonitorMap.keySet());
     }
   }
 
-  private synchronized void unregisterPerInstanceResources(Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+  private void unregisterPerInstanceResources(Collection<PerInstanceResourceMonitor.BeanName> beanNames)
       throws MalformedObjectNameException {
-    for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
-      unregister(getObjectName(
-          getPerInstanceResourceBeanName(beanName.instanceName(), beanName.resourceName())));
+    synchronized (_perInstanceResourceMonitorMap) {
+      for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+        unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(), beanName.resourceName())));
+      }
+      _perInstanceResourceMonitorMap.keySet().removeAll(beanNames);
     }
-    _perInstanceResourceMap.keySet().removeAll(beanNames);
   }
 
-  private synchronized void registerWorkflow(WorkflowMonitor workflowMonitor) throws MalformedObjectNameException {
+  private void registerWorkflow(WorkflowMonitor workflowMonitor) throws MalformedObjectNameException {
     String workflowBeanName = getWorkflowBeanName(workflowMonitor.getWorkflowType());
     register(workflowMonitor, getObjectName(workflowBeanName));
   }
 
-  private synchronized void unregisterWorkflows(Collection<String> workflowMonitors)
+  private void unregisterAllWorkflowsMonitor()
       throws MalformedObjectNameException {
-    for (String workflowMonitor : workflowMonitors) {
-      String workflowBeanName = getWorkflowBeanName(workflowMonitor);
-      unregister(getObjectName(workflowBeanName));
-      _perTypeWorkflowMonitorMap.remove(workflowMonitor);
+    synchronized (_perTypeWorkflowMonitorMap) {
+      Iterator<Map.Entry<String, WorkflowMonitor>> workflowIter =
+          _perTypeWorkflowMonitorMap.entrySet().iterator();
+      while (workflowIter.hasNext()) {
+        Map.Entry<String, WorkflowMonitor> workflowEntry = workflowIter.next();
+        unregister(getObjectName(getWorkflowBeanName(workflowEntry.getKey())));
+        workflowIter.remove();
+      }
     }
   }
 
-  private synchronized void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException {
+  private void registerJob(JobMonitor jobMonitor) throws MalformedObjectNameException {
     String jobBeanName = getJobBeanName(jobMonitor.getJobType());
     register(jobMonitor, getObjectName(jobBeanName));
   }
 
-  private synchronized void unregisterJobs(Collection<String> jobMonitors) throws MalformedObjectNameException {
-    for (String jobMonitor : jobMonitors) {
-      String jobBeanName = getJobBeanName(jobMonitor);
-      unregister(getObjectName(jobBeanName));
-      _perTypeJobMonitorMap.remove(jobMonitor);
+  private void unregisterAllJobs() throws MalformedObjectNameException {
+    synchronized (_perTypeJobMonitorMap) {
+      Iterator<Map.Entry<String, JobMonitor>> jobIter = _perTypeJobMonitorMap.entrySet().iterator();
+      while (jobIter.hasNext()) {
+        Map.Entry<String, JobMonitor> jobEntry = jobIter.next();
+        unregister(getObjectName(getJobBeanName(jobEntry.getKey())));
+        jobIter.remove();
+      }
     }
   }
 
   // For test only
   protected ResourceMonitor getResourceMonitor(String resourceName) {
-    return _resourceMbeanMap.get(resourceName);
+    return _resourceMonitorMap.get(resourceName);
   }
 
   public String clusterBeanName() {
@@ -848,13 +904,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
   @Override
   public long getTotalResourceGauge() {
-    return _resourceMbeanMap.size();
+    return _resourceMonitorMap.size();
   }
 
   @Override
   public long getTotalPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getPartitionGauge();
     }
     return total;
@@ -863,7 +919,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   @Override
   public long getErrorPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getErrorPartitionGauge();
     }
     return total;
@@ -872,7 +928,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   @Override
   public long getMissingTopStatePartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getMissingTopStatePartitionGauge();
     }
     return total;
@@ -881,7 +937,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   @Override
   public long getMissingMinActiveReplicaPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getMissingMinActiveReplicaPartitionGauge();
     }
     return total;
@@ -890,7 +946,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   @Override
   public long getMissingReplicaPartitionGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getMissingReplicaPartitionGauge();
     }
     return total;
@@ -899,7 +955,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   @Override
   public long getDifferenceWithIdealStateGauge() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getDifferenceWithIdealStateGauge();
     }
     return total;
@@ -908,7 +964,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   @Override
   public long getStateTransitionCounter() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getTotalMessageReceived();
     }
     return total;
@@ -917,7 +973,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   @Override
   public long getPendingStateTransitionGuage() {
     long total = 0;
-    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMonitorMap.entrySet()) {
       total += entry.getValue().getNumPendingStateTransitionGauge();
     }
     return total;

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index bd910e9..391bdd5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -243,8 +243,8 @@ public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            2,
-            "Distributed controller should have 2 handler (message) after lose leadership, but was "
+            1,
+            "Distributed controller should have 1 handler (message) after lose leadership, but was "
                 + handlers.size());
 
     // clean up

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index 0a1b9fb..909e44c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -146,7 +146,7 @@ public class TestDistributedControllerManager extends ZkTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            2,
+            1,
             "Distributed controller should have 1 handler (message) after lose leadership, but was "
                 + handlers.size());
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
index 5f0029f..7966f03 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterEventStatusMonitor.java
@@ -49,8 +49,8 @@ public class TestClusterEventStatusMonitor {
       super(clusterName);
       active();
     }
-    public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMBean() {
-      return _clusterEventMbeanMap;
+    public ConcurrentHashMap<String, ClusterEventMonitor> getClusterEventMonitors() {
+      return _clusterEventMonitorMap;
     }
   }
 
@@ -149,11 +149,11 @@ public class TestClusterEventStatusMonitor {
 
   private void addTestEventMonitor(ClusterStatusMonitorForTest monitor, String phaseName) throws
       JMException {
-    ConcurrentHashMap<String, ClusterEventMonitor> mbean = monitor.getClusterEventMBean();
+    ConcurrentHashMap<String, ClusterEventMonitor> monitors = monitor.getClusterEventMonitors();
     ClusterEventMonitor eventMonitor = new ClusterEventMonitor(monitor, phaseName,
         TEST_SLIDING_WINDOW_MS);
     eventMonitor.register();
-    mbean.put(phaseName, eventMonitor);
+    monitors.put(phaseName, eventMonitor);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f9ec24e9/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 7eb2b8d..6156666 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -233,7 +233,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount - 2);
       }
-    }, 10000));
+    }, 3000));
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -247,8 +247,13 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
     }
     firstController.disconnect();
 
+    ZkHelixClusterVerifier controllerClusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(_controllerClusterName).setZkClient(_gZkClient)
+            .build();
+    Assert.assertTrue(controllerClusterVerifier.verifyByPolling(), "Controller cluster was not converged");
+
     // 1 controller goes away
-    // 1 message queue mbean, 1 PerInstanceResource mbean, and one message queue mbean
+    // 1 message queue mbean, 1 PerInstanceResource mbean, and one event mbean
     final int previousMBeanCount2 = mbeans.size();
     Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
       @Override public boolean verify() throws Exception {
@@ -258,7 +263,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount2 - 3);
       }
-    }, 10000));
+    }, 5000));
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
@@ -276,7 +281,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount3 + 2);
       }
-    }, 10000));
+    }, 3000));
 
     // Add a resource
     // Register 1 resource mbean
@@ -299,7 +304,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount4 + _participants.length + 1);
       }
-    }, 10000));
+    }, 3000));
 
     // Remove a resource
     // No change in instance/resource mbean
@@ -315,7 +320,7 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         mbeans.addAll(newMbeans);
         return newMbeans.size() == (previousMBeanCount5 - (_participants.length + 1));
       }
-    }, 10000));
+    }, 3000));
 
     // Cleanup controllers then MBeans should all be removed.
     cleanupControllers();
@@ -331,6 +336,6 @@ public class TestClusterStatusMonitorLifecycle extends ZkTestBase {
         return ManagementFactory.getPlatformMBeanServer()
             .queryMBeans(new ObjectName("ClusterStatus:*"), exp2).isEmpty();
       }
-    }, 10000));
+    }, 3000));
   }
 }