You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/05/14 19:25:54 UTC

git commit: [HELIX-444] add per-participant partition count gauges to helix, rb=21419

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.2-release 7b5250a34 -> 9661fd2f8


[HELIX-444] add per-participant partition count gauges to helix, rb=21419


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9661fd2f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9661fd2f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9661fd2f

Branch: refs/heads/helix-0.6.2-release
Commit: 9661fd2f832c904377959ece434e769c34f87f99
Parents: 7b5250a
Author: zzhang <zz...@apache.org>
Authored: Wed May 14 10:25:31 2014 -0700
Committer: zzhang <zz...@apache.org>
Committed: Wed May 14 10:25:31 2014 -0700

----------------------------------------------------------------------
 .../stages/BestPossibleStateCalcStage.java      |   8 +
 .../stages/BestPossibleStateOutput.java         |  44 ++--
 .../controller/stages/ClusterDataCache.java     |  39 +---
 .../stages/ExternalViewComputeStage.java        |   2 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java | 181 +++++++++++++---
 .../monitoring/mbeans/InstanceMonitor.java      |   4 +-
 .../mbeans/PerInstanceResourceMonitor.java      | 146 +++++++++++++
 .../mbeans/PerInstanceResourceMonitorMBean.java |  34 +++
 .../monitoring/mbeans/ResourceMonitor.java      |  21 +-
 .../TestClusterStatusMonitorLifecycle.java      |  40 ++--
 .../mbeans/TestClusterStatusMonitor.java        | 210 +++++++++++--------
 .../monitoring/mbeans/TestResourceMonitor.java  | 127 +++--------
 12 files changed, 582 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 458218c..a4aa4a3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -32,6 +32,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.util.HelixUtil;
 import org.apache.log4j.Logger;
 
@@ -61,6 +62,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         compute(event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
 
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+          cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+    }
+
     long endTime = System.currentTimeMillis();
     logger.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
index 3da9bef..3dc4568 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateOutput.java
@@ -22,36 +22,52 @@ package org.apache.helix.controller.stages;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.helix.model.Partition;
 
 public class BestPossibleStateOutput {
-  // resource->partition->instance->state
-  Map<String, Map<Partition, Map<String, String>>> _dataMap;
+  // Map of resource->partition->instance->state
+  Map<String, Map<Partition, Map<String, String>>> _stateMap;
 
   public BestPossibleStateOutput() {
-    _dataMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+    _stateMap = new HashMap<String, Map<Partition, Map<String, String>>>();
   }
 
-  public void setState(String resourceName, Partition resource,
+  public Set<String> resourceSet() {
+    return _stateMap.keySet();
+  }
+
+  public void setState(String resourceName, Partition partition,
       Map<String, String> bestInstanceStateMappingForResource) {
-    if (!_dataMap.containsKey(resourceName)) {
-      _dataMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+    if (!_stateMap.containsKey(resourceName)) {
+      _stateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+    }
+    Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
+    map.put(partition, bestInstanceStateMappingForResource);
+  }
+
+  public void setState(String resourceName, Partition partition, String instance, String state) {
+    if (!_stateMap.containsKey(resourceName)) {
+      _stateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+    }
+
+    if (!_stateMap.get(resourceName).containsKey(partition)) {
+      _stateMap.get(resourceName).put(partition, new HashMap<String, String>());
     }
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
-    map.put(resource, bestInstanceStateMappingForResource);
+    _stateMap.get(resourceName).get(partition).put(instance, state);
   }
 
-  public Map<String, String> getInstanceStateMap(String resourceName, Partition resource) {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+  public Map<String, String> getInstanceStateMap(String resourceName, Partition partition) {
+    Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
     if (map != null) {
-      return map.get(resource);
+      return map.get(partition);
     }
     return Collections.emptyMap();
   }
 
   public Map<Partition, Map<String, String>> getResourceMap(String resourceName) {
-    Map<Partition, Map<String, String>> map = _dataMap.get(resourceName);
+    Map<Partition, Map<String, String>> map = _stateMap.get(resourceName);
     if (map != null) {
       return map;
     }
@@ -59,11 +75,11 @@ public class BestPossibleStateOutput {
   }
 
   public Map<String, Map<Partition, Map<String, String>>> getStateMap() {
-    return _dataMap;
+    return _stateMap;
   }
 
   @Override
   public String toString() {
-    return _dataMap.toString();
+    return _stateMap.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2bf9f11..427873f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -312,37 +312,6 @@ public class ClusterDataCache {
     }
   }
 
-  // public HealthStat getGlobalStats()
-  // {
-  // return _globalStats;
-  // }
-  //
-  // public PersistentStats getPersistentStats()
-  // {
-  // return _persistentStats;
-  // }
-  //
-  // public Alerts getAlerts()
-  // {
-  // return _alerts;
-  // }
-  //
-  // public AlertStatus getAlertStatus()
-  // {
-  // return _alertStatus;
-  // }
-  //
-  // public Map<String, HealthStat> getHealthStats(String instanceName)
-  // {
-  // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
-  // if (map != null)
-  // {
-  // return map;
-  // } else
-  // {
-  // return Collections.emptyMap();
-  // }
-  // }
   /**
    * Provides the state model definition for a given state model
    * @param stateModelDefRef
@@ -354,6 +323,14 @@ public class ClusterDataCache {
   }
 
   /**
+   * Provides all state model definitions
+   * @return state model definition map
+   */
+  public Map<String, StateModelDefinition> getStateModelDefMap() {
+    return _stateModelDefMap;
+  }
+
+  /**
    * Provides the idealstate for a given resource
    * @param resourceName
    * @return

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 35ef177..7ef4584 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -111,7 +111,7 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
         if (clusterStatusMonitor != null
             && !idealState.getStateModelDefRef().equalsIgnoreCase(
                 DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-          clusterStatusMonitor.onExternalViewChange(view,
+          clusterStatusMonitor.setResourceStatus(view,
               cache._idealStateMap.get(view.getResourceName()));
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b468856..5389e76 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -32,8 +33,13 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
@@ -41,15 +47,15 @@ import com.google.common.collect.Sets;
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class);
 
-  static final String CLUSTER_STATUS_KEY = "ClusterStatus";
+  public static final String CLUSTER_STATUS_KEY = "ClusterStatus";
   static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
   static final String RESOURCE_STATUS_KEY = "ResourceStatus";
-  static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+  public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
   static final String CLUSTER_DN_KEY = "cluster";
   static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
 
-  static final String DEFAULT_TAG = "DEFAULT";
+  public static final String DEFAULT_TAG = "DEFAULT";
 
   private final String _clusterName;
   private final MBeanServer _beanServer;
@@ -68,20 +74,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
       new ConcurrentHashMap<String, InstanceMonitor>();
 
+  /**
+   * PerInstanceResource bean map: beanName->bean
+   */
+  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap =
+      new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>();
+
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
     _beanServer = ManagementFactory.getPlatformMBeanServer();
     try {
-      register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+      register(this, getObjectName(clusterBeanName()));
     } catch (Exception e) {
-      LOG.error("Register self failed.", e);
+      LOG.error("Fail to regiter ClusterStatusMonitor", e);
     }
   }
 
   public ObjectName getObjectName(String name) throws MalformedObjectNameException {
-    return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+    return new ObjectName(String.format("%s: %s", CLUSTER_STATUS_KEY, name));
   }
 
+  // TODO remove getBeanName()?
   // Used by other external JMX consumers like ingraph
   public String getBeanName() {
     return CLUSTER_STATUS_KEY + " " + _clusterName;
@@ -144,21 +157,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
 
     try {
-      LOG.info("Registering " + name.toString());
+      LOG.info("Register MBean: " + name);
       _beanServer.registerMBean(bean, name);
     } catch (Exception e) {
-      LOG.warn("Could not register MBean" + name, e);
+      LOG.warn("Could not register MBean: " + name, e);
     }
   }
 
   private void unregister(ObjectName name) {
     try {
       if (_beanServer.isRegistered(name)) {
-        LOG.info("Unregistering " + name.toString());
+        LOG.info("Unregister MBean: " + name);
         _beanServer.unregisterMBean(name);
       }
     } catch (Exception e) {
-      LOG.warn("Could not unregister MBean" + name, e);
+      LOG.warn("Could not unregister MBean: " + name, e);
     }
   }
 
@@ -227,28 +240,100 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void onExternalViewChange(ExternalView externalView, IdealState idealState) {
+  /**
+   * Update gauges for resource at instance level
+   * @param bestPossibleStates
+   * @param resourceMap
+   * @param stateModelDefMap
+   */
+  public void setPerInstanceResourceStatus(BestPossibleStateOutput bestPossibleStates,
+      Map<String, InstanceConfig> instanceConfigMap, Map<String, Resource> resourceMap,
+      Map<String, StateModelDefinition> stateModelDefMap) {
+
+    // Convert to perInstanceResource beanName->partition->state
+    Map<PerInstanceResourceMonitor.BeanName, Map<Partition, String>> beanMap =
+        new HashMap<PerInstanceResourceMonitor.BeanName, Map<Partition, String>>();
+
+    for (String resource : bestPossibleStates.resourceSet()) {
+      Map<Partition, Map<String, String>> partitionStateMap =
+          bestPossibleStates.getResourceMap(resource);
+      for (Partition partition : partitionStateMap.keySet()) {
+        Map<String, String> instanceStateMap = partitionStateMap.get(partition);
+        for (String instance : instanceStateMap.keySet()) {
+          String state = instanceStateMap.get(instance);
+          PerInstanceResourceMonitor.BeanName beanName =
+              new PerInstanceResourceMonitor.BeanName(instance, resource);
+          if (!beanMap.containsKey(beanName)) {
+            beanMap.put(beanName, new HashMap<Partition, String>());
+          }
+          beanMap.get(beanName).put(partition, state);
+        }
+      }
+    }
+
+    // Unregister beans for per-instance resources that no longer exist
+    Set<PerInstanceResourceMonitor.BeanName> toUnregister =
+        Sets.newHashSet(_perInstanceResourceMap.keySet());
+    toUnregister.removeAll(beanMap.keySet());
+    try {
+      unregisterPerInstanceResources(toUnregister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Fail to unregister per-instance resource from MBean server: " + toUnregister, e);
+    }
+
+    // Register beans for per-instance resources that are newly configured
+    Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
+    toRegister.removeAll(_perInstanceResourceMap.keySet());
+    Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
+    for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
+      PerInstanceResourceMonitor bean =
+          new PerInstanceResourceMonitor(_clusterName, beanName.instanceName(),
+              beanName.resourceName());
+      String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
+      InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+      bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+          stateModelDefMap.get(stateModelDefName));
+      monitorsToRegister.add(bean);
+    }
+
+    try {
+      registerPerInstanceResources(monitorsToRegister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
+    }
+
+    // Update existing beans
+    for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet()) {
+      PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+      String stateModelDefName = resourceMap.get(beanName.resourceName()).getStateModelDefRef();
+      InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+      bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+          stateModelDefMap.get(stateModelDefName));
+    }
+  }
+
+  public void setResourceStatus(ExternalView externalView, IdealState idealState) {
     try {
       String resourceName = externalView.getId();
       if (!_resourceMbeanMap.containsKey(resourceName)) {
         synchronized (this) {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
-            bean.updateExternalView(externalView, idealState);
+            bean.updateResource(externalView, idealState);
             registerResources(Arrays.asList(bean));
           }
         }
       }
       ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
       String oldSensorName = bean.getSensorName();
-      bean.updateExternalView(externalView, idealState);
+      bean.updateResource(externalView, idealState);
       String newSensorName = bean.getSensorName();
       if (!oldSensorName.equals(newSensorName)) {
         unregisterResources(Arrays.asList(resourceName));
         registerResources(Arrays.asList(bean));
       }
     } catch (Exception e) {
-      LOG.warn(e);
+      LOG.error("Fail to set resource status", e);
     }
   }
 
@@ -264,18 +349,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       }
       _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
     } catch (Exception e) {
-      LOG.warn("fail to add message queue size to mbean", e);
+      LOG.error("Fail to add message queue size to mbean", e);
     }
   }
 
   public void reset() {
-    LOG.info("Resetting ClusterStatusMonitor");
+    LOG.info("Reset ClusterStatusMonitor");
     try {
-      for (String resourceName : _resourceMbeanMap.keySet()) {
-        String beanName =
-            CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
-        unregister(getObjectName(beanName));
-      }
+      unregisterResources(_resourceMbeanMap.keySet());
       _resourceMbeanMap.clear();
 
       for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values()) {
@@ -286,9 +367,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       unregisterInstances(_instanceMbeanMap.keySet());
       _instanceMbeanMap.clear();
 
-      unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+      unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+
+      unregister(getObjectName(clusterBeanName()));
     } catch (Exception e) {
-      LOG.error("fail to reset ClusterStatusMonitor", e);
+      LOG.error("Fail to reset ClusterStatusMonitor", e);
     }
   }
 
@@ -311,6 +394,28 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _instanceMbeanMap.keySet().removeAll(instances);
   }
 
+  private synchronized void registerPerInstanceResources(
+      Collection<PerInstanceResourceMonitor> monitors) throws MalformedObjectNameException {
+    for (PerInstanceResourceMonitor monitor : monitors) {
+      String instanceName = monitor.getInstanceName();
+      String resourceName = monitor.getResourceName();
+      String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+      register(monitor, getObjectName(beanName));
+      _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName,
+          resourceName), monitor);
+    }
+  }
+
+  private synchronized void unregisterPerInstanceResources(
+      Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+      throws MalformedObjectNameException {
+    for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+      unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
+          beanName.resourceName())));
+    }
+    _perInstanceResourceMap.keySet().removeAll(beanNames);
+  }
+
   private synchronized void registerResources(Collection<ResourceMonitor> resources)
       throws MalformedObjectNameException {
     for (ResourceMonitor monitor : resources) {
@@ -330,12 +435,38 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _resourceMbeanMap.keySet().removeAll(resources);
   }
 
+  public String clusterBeanName() {
+    return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
+  }
+
+  /**
+   * Build instance bean name
+   * @param instanceName
+   * @return instance bean name
+   */
   private String getInstanceBeanName(String instanceName) {
-    return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName;
+    return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, instanceName);
   }
 
+  /**
+   * Build resource bean name
+   * @param resourceName
+   * @return resource bean name
+   */
   private String getResourceBeanName(String resourceName) {
-    return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
+    return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, resourceName);
+  }
+
+  /**
+   * Build per-instance resource bean name:
+   * "cluster={clusterName},instanceName={instanceName},resourceName={resourceName}"
+   * @param instanceName
+   * @param resourceName
+   * @return per-instance resource bean name
+   */
+  public String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
+    return String.format("%s,%s", clusterBeanName(), new PerInstanceResourceMonitor.BeanName(
+        instanceName, resourceName).toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index 1385568..d9875cc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -54,8 +54,8 @@ public class InstanceMonitor implements InstanceMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "."
-        + serializedTags() + "." + _participantName;
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+        serializedTags(), _participantName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
new file mode 100644
index 0000000..714767b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -0,0 +1,146 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+  public static class BeanName {
+    private final String _instanceName;
+    private final String _resourceName;
+
+    public BeanName(String instanceName, String resourceName) {
+      if (instanceName == null || resourceName == null) {
+        throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
+            + ", resourceName: " + resourceName);
+      }
+      _instanceName = instanceName;
+      _resourceName = resourceName;
+    }
+
+    public String instanceName() {
+      return _instanceName;
+    }
+
+    public String resourceName() {
+      return _resourceName;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof BeanName)) {
+        return false;
+      }
+
+      BeanName that = (BeanName) obj;
+      return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+    }
+
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
+          ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+    }
+  }
+
+  private final String _clusterName;
+  private List<String> _tags;
+  private final String _participantName;
+  private final String _resourceName;
+  private long _partitions;
+
+  public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+    _clusterName = clusterName;
+    _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    _participantName = participantName;
+    _resourceName = resourceName;
+    _partitions = 0;
+  }
+
+  @Override
+  public String getSensorName() {
+    return Joiner
+        .on('.')
+        .join(
+            ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+                serializedTags(), _participantName, _resourceName)).toString();
+  }
+
+  private String serializedTags() {
+    return Joiner.on('|').skipNulls().join(_tags).toString();
+  }
+
+  @Override
+  public long getPartitionGauge() {
+    return _partitions;
+  }
+
+  public String getInstanceName() {
+    return _participantName;
+  }
+
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  /**
+   * Update per-instance resource bean
+   * @param stateMap partition->state
+   * @tags tags instance tags
+   * @param stateModelDef
+   */
+  public synchronized void update(Map<Partition, String> stateMap, Set<String> tags,
+      StateModelDefinition stateModelDef) {
+    if (tags == null || tags.isEmpty()) {
+      _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    } else {
+      _tags = Lists.newArrayList(tags);
+      Collections.sort(_tags);
+    }
+
+    int cnt = 0;
+    for (String state : stateMap.values()) {
+      // Skip DROPPED and initial state (e.g. OFFLINE)
+      if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name())
+          || state.equalsIgnoreCase(stateModelDef.getInitialState())) {
+        continue;
+      }
+      cnt++;
+    }
+    _partitions = cnt;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
new file mode 100644
index 0000000..4b544b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
@@ -0,0 +1,34 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * A bean that describes the resource on each instance
+ */
+public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
+  /**
+   * Get the number of partitions of the resource in best possible ideal state
+   * for the instance
+   * @return number of partitions
+   */
+  long getPartitionGauge();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index a07c79d..7304033 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -27,14 +27,15 @@ import org.apache.helix.model.IdealState;
 import org.apache.log4j.Logger;
 
 public class ResourceMonitor implements ResourceMonitorMBean {
-  private int _numOfPartitions;
-  int _numOfPartitionsInExternalView;
-  int _numOfErrorPartitions;
-  int _externalViewIdealStateDiff;
-  String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
 
-  String _resourceName, _clusterName;
+  private int _numOfPartitions;
+  private int _numOfPartitionsInExternalView;
+  private int _numOfErrorPartitions;
+  private int _externalViewIdealStateDiff;
+  private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
+  private final String _clusterName;
+  private final String _resourceName;
 
   public ResourceMonitor(String clusterName, String resourceName) {
     _clusterName = clusterName;
@@ -58,15 +59,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "."
-        + _resourceName;
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+        _tag, _resourceName);
   }
 
   public String getResourceName() {
     return _resourceName;
   }
 
-  public void updateExternalView(ExternalView externalView, IdealState idealState) {
+  public void updateResource(ExternalView externalView, IdealState idealState) {
     if (externalView == null) {
       LOG.warn("external view is null");
       return;
@@ -91,7 +92,7 @@ public class ResourceMonitor implements ResourceMonitorMBean {
     }
 
     // TODO fix this; IdealState shall have either map fields (CUSTOM mode)
-    // or list fields (AUDO mode)
+    // or list fields (AUTO mode)
     for (String partitionName : idealState.getRecord().getMapFields().keySet()) {
       Map<String, String> idealRecord = idealState.getInstanceStateMap(partitionName);
       Map<String, String> externalViewRecord = externalView.getStateMap(partitionName);

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 45bc709..58b691a 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -18,12 +18,14 @@ import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
 
   MockParticipantManager[] _participants;
   ClusterDistributedController[] _controllers;
@@ -157,12 +159,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     @Override
     public void onMBeanRegistered(MBeanServerConnection server,
         MBeanServerNotification mbsNotification) {
+      LOG.info("Register mbean: " + mbsNotification.getMBeanName());
       _nMbeansRegistered++;
     }
 
     @Override
     public void onMBeanUnRegistered(MBeanServerConnection server,
         MBeanServerNotification mbsNotification) {
+      LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
       _nMbeansUnregistered++;
     }
   }
@@ -177,9 +181,11 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     _participants[0].disconnect();
 
-    // participant goes away. should be no change in number of beans as config is still present
+    // 1 participant goes away
+    // No change in instance/resource mbean
+    // Unregister 1 per-instance resource mbean 
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
     Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
@@ -196,19 +202,25 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     Thread.sleep(1000);
 
     // 1 cluster status monitor, 1 resource monitor, 5 instances
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+    // Unregister 1+4+1 per-instance resource mbean
+    // Register 4 per-instance resource mbean
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 11);
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
     _participants[0].syncStart();
 
-    // participant goes back. should be no change
+    // 1 participant comes back
+    // No change in instance/resource mbean
+    // Register 1 per-instance resource mbean
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 7);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 12);
 
-    // Add a resource, one more mbean registered
+    // Add a resource
+    // Register 1 resource mbean
+    // Register 5 per-instance resource mbean
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
     IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
 
@@ -218,14 +230,16 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
         Integer.parseInt(idealState.getReplicas()));
 
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
 
-    // remove resource, no change
+    // Remove a resource
+    // No change in instance/resource mbean
+    // Unregister 5 per-instance resource mbean
     setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
     Thread.sleep(1000);
-    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 7);
-    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 8);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 18);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 711aff2..723d969 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,101 +19,145 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.util.ArrayList;
+import java.lang.management.ManagementFactory;
 import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.Mocks;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.apache.helix.tools.DefaultIdealStateCalculator;
+import java.util.Map;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.beust.jcommander.internal.Maps;
+
 public class TestClusterStatusMonitor {
-  List<String> _instances;
-  List<ZNRecord> _liveInstances;
-  String _db = "DB";
-  String _db2 = "TestDB";
-  int _replicas = 3;
-  int _partitions = 50;
-  ZNRecord _externalView, _externalView2;
-
-  class MockDataAccessor extends Mocks.MockAccessor {
-    public MockDataAccessor() {
-      _instances = new ArrayList<String>();
-      for (int i = 0; i < 5; i++) {
-        String instance = "localhost_" + (12918 + i);
-        _instances.add(instance);
-      }
-      ZNRecord externalView =
-          DefaultIdealStateCalculator.calculateIdealState(_instances, _partitions, _replicas, _db,
-              "MASTER", "SLAVE");
-
-      ZNRecord externalView2 =
-          DefaultIdealStateCalculator.calculateIdealState(_instances, 80, 2, _db2, "MASTER",
-              "SLAVE");
+  private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
 
+  @Test()
+  public void testReportData() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 5;
+    String testDB = "TestDB";
+    String testDB_0 = testDB + "_0";
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+    try {
+      _server.getMBeanInfo(clusterMonitorObjName);
+    } catch (Exception e) {
+      Assert.fail("Fail to register ClusterStatusMonitor");
     }
 
-    public ZNRecord getProperty(PropertyType type, String resource) {
-      if (type == PropertyType.IDEALSTATES || type == PropertyType.EXTERNALVIEW) {
-        if (resource.equals(_db)) {
-          return _externalView;
-        } else if (resource.equals(_db2)) {
-          return _externalView2;
-        }
-      }
-      return null;
+    // Test #setPerInstanceResourceStatus()
+    BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
+    bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12918", "MASTER");
+    bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12919", "SLAVE");
+    bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12920", "SLAVE");
+    bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12921", "OFFLINE");
+    bestPossibleStates.setState(testDB, new Partition(testDB_0), "localhost_12922", "DROPPED");
+
+    Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      InstanceConfig config = new InstanceConfig(instanceName);
+      instanceConfigMap.put(instanceName, config);
     }
-  }
 
-  class MockHelixManager extends Mocks.MockManager {
-    MockDataAccessor _accessor = new MockDataAccessor();
-
-    @Override
-    public HelixDataAccessor getHelixDataAccessor() {
-      return _accessor;
+    Map<String, Resource> resourceMap = Maps.newHashMap();
+    Resource db = new Resource(testDB);
+    db.setStateModelDefRef("MasterSlave");
+    db.addPartition(testDB_0);
+    resourceMap.put(testDB, db);
+
+    Map<String, StateModelDefinition> stateModelDefMap = Maps.newHashMap();
+    StateModelDefinition msStateModelDef =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    stateModelDefMap.put("MasterSlave", msStateModelDef);
+
+    monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+        stateModelDefMap);
+
+    // localhost_12918 should have 1 partition because it's MASTER
+    ObjectName objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+    Object value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(1));
+    value = _server.getAttribute(objName, "SensorName");
+    Assert.assertTrue(value instanceof String);
+    Assert.assertEquals((String) value, String.format("%s.%s.%s.%s.%s",
+        ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, clusterName, ClusterStatusMonitor.DEFAULT_TAG,
+        "localhost_12918", testDB));
+
+    // localhost_12919 should have 1 partition because it's SLAVE
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12919", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(1));
+
+    // localhost_12921 should have 0 partition because it's OFFLINE
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12921", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(0));
+
+    // localhost_12922 should have 0 partition because it's DROPPED
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12922", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(0));
+
+    // Missing localhost_12918 in best possible ideal-state should remove it from mbean
+    bestPossibleStates.getInstanceStateMap(testDB, new Partition(testDB_0)).remove(
+        "localhost_12918");
+    monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+        stateModelDefMap);
+    try {
+      objName =
+          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+      _server.getMBeanInfo(objName);
+      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
+
+    } catch (InstanceNotFoundException e) {
+      // OK
     }
 
-  }
+    // Clean up
+    monitor.reset();
 
-  @Test()
-  public void TestReportData() {
-    System.out.println("START TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
-    List<String> _instances;
-    List<ZNRecord> _liveInstances = new ArrayList<ZNRecord>();
-    String _db = "DB";
-    int _replicas = 3;
-    int _partitions = 50;
-
-    _instances = new ArrayList<String>();
-    for (int i = 0; i < 5; i++) {
-      String instance = "localhost_" + (12918 + i);
-      _instances.add(instance);
-      ZNRecord metaData = new ZNRecord(instance);
-      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
-          .toString());
-      _liveInstances.add(metaData);
+    try {
+      objName =
+          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+      _server.getMBeanInfo(objName);
+      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
+
+    } catch (InstanceNotFoundException e) {
+      // OK
     }
-    ZNRecord externalView =
-        DefaultIdealStateCalculator.calculateIdealState(_instances, _partitions, _replicas, _db,
-            "MASTER", "SLAVE");
-
-    ZNRecord externalView2 =
-        DefaultIdealStateCalculator.calculateIdealState(_instances, 80, 2, "TestDB", "MASTER",
-            "SLAVE");
-
-    List<ZNRecord> externalViews = new ArrayList<ZNRecord>();
-    externalViews.add(externalView);
-    externalViews.add(externalView2);
-
-    ClusterStatusMonitor monitor = new ClusterStatusMonitor("cluster1");
-    MockHelixManager manager = new MockHelixManager();
-    NotificationContext context = new NotificationContext(manager);
-    System.out.println("END TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
+
+    try {
+      _server.getMBeanInfo(clusterMonitorObjName);
+      Assert.fail("Fail to unregister ClusterStatusMonitor");
+    } catch (InstanceNotFoundException e) {
+      // OK
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9661fd2f/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index d631dd2..652eb88 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -22,21 +22,13 @@ package org.apache.helix.monitoring.mbeans;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixProperty;
-import org.apache.helix.Mocks;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
 import org.apache.helix.monitoring.mbeans.ResourceMonitor;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestResourceMonitor {
@@ -45,108 +37,53 @@ public class TestResourceMonitor {
   int _replicas = 3;
   int _partitions = 50;
 
-  class MockHelixManager extends Mocks.MockManager {
-    class MockDataAccessor extends Mocks.MockAccessor {
-      @Override
-      public <T extends HelixProperty> List<T> getChildValues(PropertyKey key) {
-        List<T> result = new ArrayList<T>();
-        PropertyType type = key.getType();
-        Class<? extends HelixProperty> clazz = key.getTypeClass();
-        if (type == PropertyType.EXTERNALVIEW) {
-          HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
-          result.add((T) typedInstance);
-          return result;
-        } else if (type == PropertyType.LIVEINSTANCES) {
-          return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
-        }
-
-        return result;
-      }
-
-      @Override
-      public <T extends HelixProperty> T getProperty(PropertyKey key) {
-        PropertyType type = key.getType();
-        if (type == PropertyType.EXTERNALVIEW) {
-          return (T) new ExternalView(_externalView);
-        } else if (type == PropertyType.IDEALSTATES) {
-          return (T) new IdealState(_idealState);
-        }
-        return null;
-      }
-    }
-
-    HelixDataAccessor _accessor = new MockDataAccessor();
-    ZNRecord _idealState;
-    ZNRecord _externalView;
-    List<String> _instances;
-    List<ZNRecord> _liveInstances;
-    String _db = "DB";
-
-    public MockHelixManager() {
-      _liveInstances = new ArrayList<ZNRecord>();
-      _instances = new ArrayList<String>();
-      for (int i = 0; i < 5; i++) {
-        String instance = "localhost_" + (12918 + i);
-        _instances.add(instance);
-        ZNRecord metaData = new ZNRecord(instance);
-        metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
-            .toString());
-
-      }
-      _idealState =
-          DefaultIdealStateCalculator.calculateIdealState(_instances, _partitions, _replicas,
-              _dbName, "MASTER", "SLAVE");
-      _externalView = new ZNRecord(_idealState);
-    }
-
-    @Override
-    public HelixDataAccessor getHelixDataAccessor() {
-      return _accessor;
-    }
-
-  }
-
   @Test()
-  public void TestReportData() {
-    MockHelixManager manager = new MockHelixManager();
+  public void testReportData() {
+    final int n = 5;
     ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName);
 
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    ExternalView externalView = helixDataAccessor.getProperty(keyBuilder.externalView(_dbName));
-    IdealState idealState = helixDataAccessor.getProperty(keyBuilder.idealStates(_dbName));
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < n; i++) {
+      String instance = "localhost_" + (12918 + i);
+      instances.add(instance);
+    }
 
-    monitor.updateExternalView(externalView, idealState);
+    ZNRecord idealStateRecord =
+        DefaultIdealStateCalculator.calculateIdealState(instances, _partitions, _replicas, _dbName,
+            "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(idealStateRecord);
+    ExternalView externalView = new ExternalView(idealStateRecord);
 
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
-    monitor.getBeanName();
+    monitor.updateResource(externalView, idealState);
 
-    int n = 4;
-    for (int i = 0; i < n; i++) {
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+    // monitor.getBeanName();
+
+    final int m = n - 1;
+    for (int i = 0; i < m; i++) {
       Map<String, String> map = externalView.getStateMap(_dbName + "_" + 3 * i);
       String key = map.keySet().toArray()[0].toString();
       map.put(key, "ERROR");
       externalView.setStateMap(_dbName + "_" + 3 * i, map);
     }
 
-    monitor.updateExternalView(externalView, idealState);
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), n);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+    monitor.updateResource(externalView, idealState);
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
 
-    n = 5;
     for (int i = 0; i < n; i++) {
       externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
     }
 
-    monitor.updateExternalView(externalView, idealState);
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 3);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+    monitor.updateResource(externalView, idealState);
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
   }
 }