You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/11 19:13:19 UTC

[5/7] git commit: [HELIX-444] add per-participant partition count gauges to helix, rb=21419

[HELIX-444] add per-participant partition count gauges to helix, rb=21419


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96aef71c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96aef71c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96aef71c

Branch: refs/heads/master
Commit: 96aef71c899dc1f3956e1211fc1e9a7459a258d1
Parents: 8527729
Author: zzhang <zz...@apache.org>
Authored: Wed May 21 15:56:57 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Jul 10 14:53:41 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/State.java   |   6 +-
 .../stages/BestPossibleStateCalcStage.java      |   9 +
 .../controller/stages/ClusterDataCache.java     |  42 +---
 .../stages/ExternalViewComputeStage.java        |  23 +-
 .../monitoring/mbeans/ClusterStatusMonitor.java | 182 +++++++++++++--
 .../monitoring/mbeans/InstanceMonitor.java      |   4 +-
 .../mbeans/PerInstanceResourceMonitor.java      | 147 +++++++++++++
 .../mbeans/PerInstanceResourceMonitorMBean.java |  34 +++
 .../monitoring/mbeans/ResourceMonitor.java      |  23 +-
 .../TestClusterStatusMonitorLifecycle.java      |  42 ++--
 .../mbeans/TestClusterStatusMonitor.java        | 220 ++++++++++++-------
 .../monitoring/mbeans/TestResourceMonitor.java  |  59 ++---
 12 files changed, 586 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index 3315987..aa98df2 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -47,9 +47,11 @@ public class State {
   @Override
   public boolean equals(Object that) {
     if (that instanceof State) {
-      return this.toString().equals(((State) that).toString());
+      return this.toString().equalsIgnoreCase(((State) that).toString());
     } else if (that instanceof String) {
-      return _state.equals(that);
+      return _state.equalsIgnoreCase(that.toString());
+    } else if (that instanceof HelixDefinedState) {
+      return _state.equalsIgnoreCase(that.toString());
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 644b9f6..2f93b7f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -42,6 +42,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -69,6 +70,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("Cluster");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
     if (currentStateOutput == null || resourceMap == null || cluster == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -79,6 +81,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
         compute(cluster, event, resourceMap, currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
 
+    ClusterStatusMonitor clusterStatusMonitor =
+        (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.setPerInstanceResourceStatus(bestPossibleStateOutput,
+          cache.getInstanceConfigMap(), resourceMap, cache.getStateModelDefMap());
+    }
+
     long endTime = System.currentTimeMillis();
     if (LOG.isInfoEnabled()) {
       LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 0c28bdf..8bcfaae 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -79,12 +79,6 @@ public class ClusterDataCache {
 
   boolean _init = true;
 
-  // Map<String, Map<String, HealthStat>> _healthStatMap;
-  // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE
-  // private PersistentStats _persistentStats;
-  // private Alerts _alerts;
-  // private AlertStatus _alertStatus;
-
   private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName());
 
   /**
@@ -334,37 +328,6 @@ public class ClusterDataCache {
     return _messageMap;
   }
 
-  // public HealthStat getGlobalStats()
-  // {
-  // return _globalStats;
-  // }
-  //
-  // public PersistentStats getPersistentStats()
-  // {
-  // return _persistentStats;
-  // }
-  //
-  // public Alerts getAlerts()
-  // {
-  // return _alerts;
-  // }
-  //
-  // public AlertStatus getAlertStatus()
-  // {
-  // return _alertStatus;
-  // }
-  //
-  // public Map<String, HealthStat> getHealthStats(String instanceName)
-  // {
-  // Map<String, HealthStat> map = _healthStatMap.get(instanceName);
-  // if (map != null)
-  // {
-  // return map;
-  // } else
-  // {
-  // return Collections.emptyMap();
-  // }
-  // }
   /**
    * Provides the state model definition for a given state model
    * @param stateModelDefRef
@@ -375,8 +338,13 @@ public class ClusterDataCache {
   }
 
   /**
+<<<<<<< HEAD
    * Get all state model definitions
    * @return map of name to state model definition
+=======
+   * Provides all state model definitions
+   * @return state model definition map
+>>>>>>> 8d5c27c... [HELIX-444] add per-participant partition count gauges to helix, rb=21419
    */
   public Map<String, StateModelDefinition> getStateModelDefMap() {
     return _stateModelDefMap;

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index a46acbd..3086a83 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -35,7 +35,6 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Resource;
 import org.apache.helix.api.State;
 import org.apache.helix.api.config.ResourceConfig;
 import org.apache.helix.api.config.SchedulerTaskConfig;
@@ -46,6 +45,7 @@ import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
@@ -66,10 +66,11 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
     Cluster cluster = event.getAttribute("Cluster");
+    ClusterDataCache cache = event.getAttribute("ClusterDataCache");
 
-    if (manager == null || resourceMap == null || cluster == null) {
+    if (manager == null || resourceMap == null || cluster == null || cache == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires ClusterManager|RESOURCES|Cluster");
+          + ". Requires ClusterManager|RESOURCES|Cluster|ClusterDataCache");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
@@ -118,15 +119,13 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
       // Update cluster status monitor mbean
       ClusterStatusMonitor clusterStatusMonitor =
           (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
-      Resource currentResource = cluster.getResourceMap().get(view.getResourceId());
-      if (clusterStatusMonitor != null && currentResource != null) {
-        IdealState idealState = currentResource.getIdealState();
-        if (idealState != null) {
-          StateModelDefId stateModelDefId = idealState.getStateModelDefId();
-          if (stateModelDefId != null
-              && !stateModelDefId.equals(StateModelDefId.SchedulerTaskQueue)) {
-            clusterStatusMonitor.onExternalViewChange(view, idealState);
-          }
+      IdealState idealState = cache._idealStateMap.get(view.getResourceName());
+      if (idealState != null) {
+        if (clusterStatusMonitor != null
+            && !idealState.getStateModelDefRef().equalsIgnoreCase(
+                DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+          clusterStatusMonitor.setResourceStatus(view,
+              cache._idealStateMap.get(view.getResourceName()));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b468856..99dee75 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -32,8 +33,18 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Sets;
@@ -41,15 +52,15 @@ import com.google.common.collect.Sets;
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = Logger.getLogger(ClusterStatusMonitor.class);
 
-  static final String CLUSTER_STATUS_KEY = "ClusterStatus";
+  public static final String CLUSTER_STATUS_KEY = "ClusterStatus";
   static final String MESSAGE_QUEUE_STATUS_KEY = "MessageQueueStatus";
   static final String RESOURCE_STATUS_KEY = "ResourceStatus";
-  static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
+  public static final String PARTICIPANT_STATUS_KEY = "ParticipantStatus";
   static final String CLUSTER_DN_KEY = "cluster";
   static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
 
-  static final String DEFAULT_TAG = "DEFAULT";
+  public static final String DEFAULT_TAG = "DEFAULT";
 
   private final String _clusterName;
   private final MBeanServer _beanServer;
@@ -68,20 +79,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private final ConcurrentHashMap<String, InstanceMonitor> _instanceMbeanMap =
       new ConcurrentHashMap<String, InstanceMonitor>();
 
+  /**
+   * PerInstanceResource bean map: beanName->bean
+   */
+  private final Map<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor> _perInstanceResourceMap =
+      new ConcurrentHashMap<PerInstanceResourceMonitor.BeanName, PerInstanceResourceMonitor>();
+
   public ClusterStatusMonitor(String clusterName) {
     _clusterName = clusterName;
     _beanServer = ManagementFactory.getPlatformMBeanServer();
     try {
-      register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+      register(this, getObjectName(clusterBeanName()));
     } catch (Exception e) {
-      LOG.error("Register self failed.", e);
+      LOG.error("Fail to regiter ClusterStatusMonitor", e);
     }
   }
 
   public ObjectName getObjectName(String name) throws MalformedObjectNameException {
-    return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
+    return new ObjectName(String.format("%s: %s", CLUSTER_STATUS_KEY, name));
   }
 
+  // TODO remove getBeanName()?
   // Used by other external JMX consumers like ingraph
   public String getBeanName() {
     return CLUSTER_STATUS_KEY + " " + _clusterName;
@@ -144,10 +162,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
 
     try {
-      LOG.info("Registering " + name.toString());
+      LOG.info("Register MBean: " + name);
       _beanServer.registerMBean(bean, name);
     } catch (Exception e) {
-      LOG.warn("Could not register MBean" + name, e);
+      LOG.warn("Could not register MBean: " + name, e);
     }
   }
 
@@ -158,7 +176,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
         _beanServer.unregisterMBean(name);
       }
     } catch (Exception e) {
-      LOG.warn("Could not unregister MBean" + name, e);
+      LOG.warn("Could not unregister MBean: " + name, e);
     }
   }
 
@@ -227,28 +245,98 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void onExternalViewChange(ExternalView externalView, IdealState idealState) {
+  /**
+   * Update gauges for resource at instance level
+   * @param bestPossibleStates
+   * @param resourceMap
+   * @param stateModelDefMap
+   */
+  public void setPerInstanceResourceStatus(BestPossibleStateOutput bestPossibleStates,
+      Map<String, InstanceConfig> instanceConfigMap, Map<ResourceId, ResourceConfig> resourceMap,
+      Map<String, StateModelDefinition> stateModelDefMap) {
+
+    // Convert to perInstanceResource beanName->partition->state
+    Map<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>> beanMap =
+        new HashMap<PerInstanceResourceMonitor.BeanName, Map<PartitionId, State>>();
+    for (ResourceId resource : bestPossibleStates.getAssignedResources()) {
+      ResourceAssignment assignment = bestPossibleStates.getResourceAssignment(resource);
+      for (PartitionId partition : assignment.getMappedPartitionIds()) {
+        Map<ParticipantId, State> instanceStateMap = assignment.getReplicaMap(partition);
+        for (ParticipantId instance : instanceStateMap.keySet()) {
+          State state = instanceStateMap.get(instance);
+          PerInstanceResourceMonitor.BeanName beanName =
+              new PerInstanceResourceMonitor.BeanName(instance.toString(), resource.toString());
+          if (!beanMap.containsKey(beanName)) {
+            beanMap.put(beanName, new HashMap<PartitionId, State>());
+          }
+          beanMap.get(beanName).put(partition, state);
+        }
+      }
+    }
+    // Unregister beans for per-instance resources that no longer exist
+    Set<PerInstanceResourceMonitor.BeanName> toUnregister =
+        Sets.newHashSet(_perInstanceResourceMap.keySet());
+    toUnregister.removeAll(beanMap.keySet());
+    try {
+      unregisterPerInstanceResources(toUnregister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Fail to unregister per-instance resource from MBean server: " + toUnregister, e);
+    }
+    // Register beans for per-instance resources that are newly configured
+    Set<PerInstanceResourceMonitor.BeanName> toRegister = Sets.newHashSet(beanMap.keySet());
+    toRegister.removeAll(_perInstanceResourceMap.keySet());
+    Set<PerInstanceResourceMonitor> monitorsToRegister = Sets.newHashSet();
+    for (PerInstanceResourceMonitor.BeanName beanName : toRegister) {
+      PerInstanceResourceMonitor bean =
+          new PerInstanceResourceMonitor(_clusterName, beanName.instanceName(),
+              beanName.resourceName());
+      StateModelDefId stateModelDefId =
+          resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+              .getStateModelDefId();
+      InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+      bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+          stateModelDefMap.get(stateModelDefId.toString()));
+      monitorsToRegister.add(bean);
+    }
+    try {
+      registerPerInstanceResources(monitorsToRegister);
+    } catch (MalformedObjectNameException e) {
+      LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
+    }
+    // Update existing beans
+    for (PerInstanceResourceMonitor.BeanName beanName : _perInstanceResourceMap.keySet()) {
+      PerInstanceResourceMonitor bean = _perInstanceResourceMap.get(beanName);
+      StateModelDefId stateModelDefId =
+          resourceMap.get(ResourceId.from(beanName.resourceName())).getRebalancerConfig()
+              .getStateModelDefId();
+      InstanceConfig config = instanceConfigMap.get(beanName.instanceName());
+      bean.update(beanMap.get(beanName), Sets.newHashSet(config.getTags()),
+          stateModelDefMap.get(stateModelDefId.toString()));
+    }
+  }
+
+  public void setResourceStatus(ExternalView externalView, IdealState idealState) {
     try {
       String resourceName = externalView.getId();
       if (!_resourceMbeanMap.containsKey(resourceName)) {
         synchronized (this) {
           if (!_resourceMbeanMap.containsKey(resourceName)) {
             ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
-            bean.updateExternalView(externalView, idealState);
+            bean.updateResource(externalView, idealState);
             registerResources(Arrays.asList(bean));
           }
         }
       }
       ResourceMonitor bean = _resourceMbeanMap.get(resourceName);
       String oldSensorName = bean.getSensorName();
-      bean.updateExternalView(externalView, idealState);
+      bean.updateResource(externalView, idealState);
       String newSensorName = bean.getSensorName();
       if (!oldSensorName.equals(newSensorName)) {
         unregisterResources(Arrays.asList(resourceName));
         registerResources(Arrays.asList(bean));
       }
     } catch (Exception e) {
-      LOG.warn(e);
+      LOG.error("Fail to set resource status, resource: " + idealState.getResourceName(), e);
     }
   }
 
@@ -264,18 +352,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       }
       _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
     } catch (Exception e) {
-      LOG.warn("fail to add message queue size to mbean", e);
+      LOG.error("Fail to add message queue size to mbean, instance: " + instanceName, e);
     }
   }
 
   public void reset() {
-    LOG.info("Resetting ClusterStatusMonitor");
+    LOG.info("Reset ClusterStatusMonitor");
     try {
-      for (String resourceName : _resourceMbeanMap.keySet()) {
-        String beanName =
-            CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
-        unregister(getObjectName(beanName));
-      }
+      unregisterResources(_resourceMbeanMap.keySet());
+
       _resourceMbeanMap.clear();
 
       for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values()) {
@@ -286,9 +371,10 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       unregisterInstances(_instanceMbeanMap.keySet());
       _instanceMbeanMap.clear();
 
-      unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
+      unregisterPerInstanceResources(_perInstanceResourceMap.keySet());
+      unregister(getObjectName(clusterBeanName()));
     } catch (Exception e) {
-      LOG.error("fail to reset ClusterStatusMonitor", e);
+      LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
     }
   }
 
@@ -330,12 +416,60 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     _resourceMbeanMap.keySet().removeAll(resources);
   }
 
+  private synchronized void registerPerInstanceResources(
+      Collection<PerInstanceResourceMonitor> monitors) throws MalformedObjectNameException {
+    for (PerInstanceResourceMonitor monitor : monitors) {
+      String instanceName = monitor.getInstanceName();
+      String resourceName = monitor.getResourceName();
+      String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
+      register(monitor, getObjectName(beanName));
+      _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName,
+          resourceName), monitor);
+    }
+  }
+
+  private synchronized void unregisterPerInstanceResources(
+      Collection<PerInstanceResourceMonitor.BeanName> beanNames)
+      throws MalformedObjectNameException {
+    for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
+      unregister(getObjectName(getPerInstanceResourceBeanName(beanName.instanceName(),
+          beanName.resourceName())));
+    }
+    _perInstanceResourceMap.keySet().removeAll(beanNames);
+  }
+
+  public String clusterBeanName() {
+    return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
+  }
+
+  /**
+   * Build instance bean name
+   * @param instanceName
+   * @return instance bean name
+   */
   private String getInstanceBeanName(String instanceName) {
-    return CLUSTER_DN_KEY + "=" + _clusterName + "," + INSTANCE_DN_KEY + "=" + instanceName;
+    return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, instanceName);
   }
 
+  /**
+   * Build resource bean name
+   * @param resourceName
+   * @return resource bean name
+   */
   private String getResourceBeanName(String resourceName) {
-    return CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "=" + resourceName;
+    return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, resourceName);
+  }
+
+  /**
+   * Build per-instance resource bean name:
+   * "cluster={clusterName},instanceName={instanceName},resourceName={resourceName}"
+   * @param instanceName
+   * @param resourceName
+   * @return per-instance resource bean name
+   */
+  public String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
+    return String.format("%s,%s", clusterBeanName(), new PerInstanceResourceMonitor.BeanName(
+        instanceName, resourceName).toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index 1385568..d9875cc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -54,8 +54,8 @@ public class InstanceMonitor implements InstanceMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.PARTICIPANT_STATUS_KEY + "." + _clusterName + "."
-        + serializedTags() + "." + _participantName;
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+        serializedTags(), _participantName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
new file mode 100644
index 0000000..476445c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -0,0 +1,147 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.StateModelDefinition;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+  public static class BeanName {
+    private final String _instanceName;
+    private final String _resourceName;
+
+    public BeanName(String instanceName, String resourceName) {
+      if (instanceName == null || resourceName == null) {
+        throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
+            + ", resourceName: " + resourceName);
+      }
+      _instanceName = instanceName;
+      _resourceName = resourceName;
+    }
+
+    public String instanceName() {
+      return _instanceName;
+    }
+
+    public String resourceName() {
+      return _resourceName;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof BeanName)) {
+        return false;
+      }
+
+      BeanName that = (BeanName) obj;
+      return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+    }
+
+    @Override
+    public int hashCode() {
+      return toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
+          ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+    }
+  }
+
+  private final String _clusterName;
+  private List<String> _tags;
+  private final String _participantName;
+  private final String _resourceName;
+  private long _partitions;
+
+  public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+    _clusterName = clusterName;
+    _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    _participantName = participantName;
+    _resourceName = resourceName;
+    _partitions = 0;
+  }
+
+  @Override
+  public String getSensorName() {
+    return Joiner
+        .on('.')
+        .join(
+            ImmutableList.of(ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
+                serializedTags(), _participantName, _resourceName)).toString();
+  }
+
+  private String serializedTags() {
+    return Joiner.on('|').skipNulls().join(_tags).toString();
+  }
+
+  @Override
+  public long getPartitionGauge() {
+    return _partitions;
+  }
+
+  public String getInstanceName() {
+    return _participantName;
+  }
+
+  public String getResourceName() {
+    return _resourceName;
+  }
+
+  /**
+   * Update per-instance resource bean
+   * @param stateMap partition->state
+   * @tags tags instance tags
+   * @param stateModelDef
+   */
+  public synchronized void update(Map<PartitionId, State> stateMap, Set<String> tags,
+      StateModelDefinition stateModelDef) {
+    if (tags == null || tags.isEmpty()) {
+      _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
+    } else {
+      _tags = Lists.newArrayList(tags);
+      Collections.sort(_tags);
+    }
+
+    int cnt = 0;
+    for (State state : stateMap.values()) {
+      // Skip DROPPED and initial state (e.g. OFFLINE)
+      if (state.equals(HelixDefinedState.DROPPED)
+          || state.equals(stateModelDef.getTypedInitialState())) {
+        continue;
+      }
+      cnt++;
+    }
+    _partitions = cnt;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
new file mode 100644
index 0000000..4b544b1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
@@ -0,0 +1,34 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.monitoring.SensorNameProvider;
+
+/**
+ * A bean that describes the resource on each instance
+ */
+public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
+  /**
+   * Get the number of partitions of the resource in best possible ideal state
+   * for the instance
+   * @return number of partitions
+   */
+  long getPartitionGauge();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index d1ba595..4739fab 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -19,6 +19,7 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.helix.HelixDefinedState;
@@ -30,14 +31,15 @@ import org.apache.helix.model.IdealState;
 import org.apache.log4j.Logger;
 
 public class ResourceMonitor implements ResourceMonitorMBean {
-  private int _numOfPartitions;
-  int _numOfPartitionsInExternalView;
-  int _numOfErrorPartitions;
-  int _externalViewIdealStateDiff;
-  String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private static final Logger LOG = Logger.getLogger(ResourceMonitor.class);
 
-  String _resourceName, _clusterName;
+  private int _numOfPartitions;
+  private int _numOfPartitionsInExternalView;
+  private int _numOfErrorPartitions;
+  private int _externalViewIdealStateDiff;
+  private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
+  private String _resourceName;
+  private String _clusterName;
 
   public ResourceMonitor(String clusterName, String resourceName) {
     _clusterName = clusterName;
@@ -61,15 +63,15 @@ public class ResourceMonitor implements ResourceMonitorMBean {
 
   @Override
   public String getSensorName() {
-    return ClusterStatusMonitor.RESOURCE_STATUS_KEY + "." + _clusterName + "." + _tag + "."
-        + _resourceName;
+    return String.format("%s.%s.%s.%s", ClusterStatusMonitor.RESOURCE_STATUS_KEY, _clusterName,
+        _tag, _resourceName);
   }
 
   public String getResourceName() {
     return _resourceName;
   }
 
-  public void updateExternalView(ExternalView externalView, IdealState idealState) {
+  public void updateResource(ExternalView externalView, IdealState idealState) {
     if (externalView == null) {
       LOG.warn("external view is null");
       return;
@@ -97,6 +99,9 @@ public class ResourceMonitor implements ResourceMonitorMBean {
     // or list fields (AUDO mode)
     for (PartitionId partitionId : idealState.getPartitionIdSet()) {
       Map<ParticipantId, State> idealRecord = idealState.getParticipantStateMap(partitionId);
+      if (idealRecord == null) {
+        idealRecord = Collections.emptyMap();
+      }
       Map<ParticipantId, State> externalViewRecord = externalView.getStateMap(partitionId);
 
       if (externalViewRecord == null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index a00db67..0981a2e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -37,12 +37,14 @@ import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
+  private static final Logger LOG = Logger.getLogger(TestClusterStatusMonitorLifecycle.class);
 
   MockParticipantManager[] _participants;
   ClusterDistributedController[] _controllers;
@@ -176,12 +178,14 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     @Override
     public void onMBeanRegistered(MBeanServerConnection server,
         MBeanServerNotification mbsNotification) {
+      LOG.info("Register mbean: " + mbsNotification.getMBeanName());
       _nMbeansRegistered++;
     }
 
     @Override
     public void onMBeanUnRegistered(MBeanServerConnection server,
         MBeanServerNotification mbsNotification) {
+      LOG.info("Unregister mbean: " + mbsNotification.getMBeanName());
       _nMbeansUnregistered++;
     }
   }
@@ -196,10 +200,12 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
 
     _participants[0].disconnect();
 
-    // participant goes away. should be no change in number of beans as config is still present
+    // 1 participant goes away
+    // No change in instance/resource mbean
+    // Unregister 1 per-instance resource mbean
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 1);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered);
 
     HelixDataAccessor accessor = _participants[n - 1].getHelixDataAccessor();
     String firstControllerName =
@@ -215,19 +221,25 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
     Thread.sleep(1000);
 
     // 1 cluster status monitor, 1 resource monitor, 5 instances
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+    // Unregister 1+4+1 per-instance resource mbean
+    // Register 4 per-instance resource mbean
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 11);
 
     String instanceName = "localhost0_" + (12918 + 0);
     _participants[0] = new MockParticipantManager(ZK_ADDR, _firstClusterName, instanceName);
     _participants[0].syncStart();
 
-    // participant goes back. should be no change
+    // 1 participant comes back
+    // No change in instance/resource mbean
+    // Register 1 per-instance resource mbean
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 7);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 12);
 
-    // Add a resource, one more mbean registered
+    // Add a resource
+    // Register 1 resource mbean
+    // Register 5 per-instance resource mbean
     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
     IdealState idealState = accessor.getProperty(accessor.keyBuilder().idealStates("TestDB00"));
 
@@ -237,14 +249,16 @@ public class TestClusterStatusMonitorLifecycle extends ZkIntegrationTestBase {
         Integer.parseInt(idealState.getReplicas()));
 
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 13);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
 
-    // remove resource, no change
+    // Remove a resource
+    // No change in instance/resource mbean
+    // Unregister 5 per-instance resource mbean
     setupTool.dropResourceFromCluster(_firstClusterName, "TestDB1");
     Thread.sleep(1000);
-    Assert.assertEquals(nMbeansUnregistered, listener._nMbeansUnregistered - 7);
-    Assert.assertEquals(nMbeansRegistered, listener._nMbeansRegistered - 8);
+    Assert.assertTrue(nMbeansUnregistered == listener._nMbeansUnregistered - 18);
+    Assert.assertTrue(nMbeansRegistered == listener._nMbeansRegistered - 18);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index facb4ea..8c9ab01 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,98 +19,162 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.util.ArrayList;
+import java.lang.management.ManagementFactory;
 import java.util.Date;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.Mocks;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
-import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import java.util.Map;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.SemiAutoRebalancerConfig;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Maps;
+
 public class TestClusterStatusMonitor {
-  List<String> _instances;
-  List<ZNRecord> _liveInstances;
-  String _db = "DB";
-  String _db2 = "TestDB";
-  int _replicas = 3;
-  int _partitions = 50;
-  ZNRecord _externalView, _externalView2;
-
-  class MockDataAccessor extends Mocks.MockAccessor {
-    public MockDataAccessor() {
-      _instances = new ArrayList<String>();
-      for (int i = 0; i < 5; i++) {
-        String instance = "localhost_" + (12918 + i);
-        _instances.add(instance);
-      }
-      ZNRecord externalView =
-          DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
-              "MASTER", "SLAVE");
-
-      ZNRecord externalView2 =
-          DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, _db2, "MASTER", "SLAVE");
-    }
+  private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+
+  @Test()
+  public void testReportData() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 5;
+    String testDB = "TestDB";
+    String testDB_0 = testDB + "_0";
 
-    public ZNRecord getProperty(PropertyType type, String resource) {
-      if (type == PropertyType.IDEALSTATES || type == PropertyType.EXTERNALVIEW) {
-        if (resource.equals(_db)) {
-          return _externalView;
-        } else if (resource.equals(_db2)) {
-          return _externalView2;
-        }
-      }
-      return null;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+    try {
+      _server.getMBeanInfo(clusterMonitorObjName);
+    } catch (Exception e) {
+      Assert.fail("Fail to register ClusterStatusMonitor");
     }
-  }
 
-  class MockHelixManager extends Mocks.MockManager {
-    MockDataAccessor _accessor = new MockDataAccessor();
+    // Test #setPerInstanceResourceStatus()
+    BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
+    ResourceAssignment assignment = new ResourceAssignment(ResourceId.from(testDB));
+    Map<ParticipantId, State> replicaMap = Maps.newHashMap();
+    replicaMap.put(ParticipantId.from("localhost_12918"), State.from("MASTER"));
+    replicaMap.put(ParticipantId.from("localhost_12919"), State.from("SLAVE"));
+    replicaMap.put(ParticipantId.from("localhost_12920"), State.from("SLAVE"));
+    replicaMap.put(ParticipantId.from("localhost_12921"), State.from("OFFLINE"));
+    replicaMap.put(ParticipantId.from("localhost_12922"), State.from("DROPPED"));
+    assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+    bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
 
-    @Override
-    public HelixDataAccessor getHelixDataAccessor() {
-      return _accessor;
+    Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap();
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      InstanceConfig config = new InstanceConfig(instanceName);
+      instanceConfigMap.put(instanceName, config);
     }
 
-  }
+    Map<ResourceId, ResourceConfig> resourceMap = Maps.newHashMap();
+    ResourceId resourceId = ResourceId.from(testDB);
+    RebalancerConfig rebalancerConfig =
+        new SemiAutoRebalancerConfig.Builder(resourceId)
+            .addPartition(new Partition(PartitionId.from(testDB_0)))
+            .stateModelDefId(StateModelDefId.from("MasterSlave")).build();
+    ResourceConfig resourceConfig =
+        new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerConfig).build();
+    resourceMap.put(resourceId, resourceConfig);
 
-  @Test()
-  public void TestReportData() {
-    System.out.println("START TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
-    List<String> _instances;
-    List<ZNRecord> _liveInstances = new ArrayList<ZNRecord>();
-    String _db = "DB";
-    int _replicas = 3;
-    int _partitions = 50;
-
-    _instances = new ArrayList<String>();
-    for (int i = 0; i < 5; i++) {
-      String instance = "localhost_" + (12918 + i);
-      _instances.add(instance);
-      ZNRecord metaData = new ZNRecord(instance);
-      metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(), UUID.randomUUID()
-          .toString());
-      _liveInstances.add(metaData);
+    Map<String, StateModelDefinition> stateModelDefMap = Maps.newHashMap();
+    StateModelDefinition msStateModelDef =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+    stateModelDefMap.put("MasterSlave", msStateModelDef);
+
+    monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+        stateModelDefMap);
+
+    // localhost_12918 should have 1 partition because it's MASTER
+    ObjectName objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+    Object value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(1));
+    value = _server.getAttribute(objName, "SensorName");
+    Assert.assertTrue(value instanceof String);
+    Assert.assertEquals((String) value, String.format("%s.%s.%s.%s.%s",
+        ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, clusterName, ClusterStatusMonitor.DEFAULT_TAG,
+        "localhost_12918", testDB));
+
+    // localhost_12919 should have 1 partition because it's SLAVE
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12919", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(1));
+
+    // localhost_12921 should have 0 partition because it's OFFLINE
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12921", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(0));
+
+    // localhost_12922 should have 0 partition because it's DROPPED
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12922", testDB));
+    value = _server.getAttribute(objName, "PartitionGauge");
+    Assert.assertTrue(value instanceof Long);
+    Assert.assertEquals((Long) value, new Long(0));
+
+    // Missing localhost_12918 in best possible ideal-state should remove it from mbean
+    replicaMap.remove(ParticipantId.from("localhost_12918"));
+    assignment.addReplicaMap(PartitionId.from(testDB_0), replicaMap);
+    bestPossibleStates.setResourceAssignment(ResourceId.from(testDB), assignment);
+    monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
+        stateModelDefMap);
+    try {
+      objName =
+          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+      _server.getMBeanInfo(objName);
+      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
+
+    } catch (InstanceNotFoundException e) {
+      // OK
     }
-    ZNRecord externalView =
-        DefaultTwoStateStrategy.calculateIdealState(_instances, _partitions, _replicas, _db,
-            "MASTER", "SLAVE");
 
-    ZNRecord externalView2 =
-        DefaultTwoStateStrategy.calculateIdealState(_instances, 80, 2, "TestDB", "MASTER", "SLAVE");
+    // Clean up
+    monitor.reset();
+
+    try {
+      objName =
+          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+      _server.getMBeanInfo(objName);
+      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
 
-    List<ZNRecord> externalViews = new ArrayList<ZNRecord>();
-    externalViews.add(externalView);
-    externalViews.add(externalView2);
+    } catch (InstanceNotFoundException e) {
+      // OK
+    }
+
+    try {
+      _server.getMBeanInfo(clusterMonitorObjName);
+      Assert.fail("Fail to unregister ClusterStatusMonitor");
+    } catch (InstanceNotFoundException e) {
+      // OK
+    }
 
-    ClusterStatusMonitor monitor = new ClusterStatusMonitor("cluster1");
-    MockHelixManager manager = new MockHelixManager();
-    NotificationContext context = new NotificationContext(manager);
-    System.out.println("END TestClusterStatusMonitor at" + new Date(System.currentTimeMillis()));
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/96aef71c/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index e8bb4b6..dcca755 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -28,14 +28,13 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.Mocks;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.strategy.DefaultTwoStateStrategy;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
-import org.testng.AssertJUnit;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestResourceMonitor {
@@ -106,46 +105,52 @@ public class TestResourceMonitor {
   }
 
   @Test()
-  public void TestReportData() {
-    MockHelixManager manager = new MockHelixManager();
+  public void testReportData() {
+    final int n = 5;
     ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName);
 
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    ExternalView externalView = helixDataAccessor.getProperty(keyBuilder.externalView(_dbName));
-    IdealState idealState = helixDataAccessor.getProperty(keyBuilder.idealStates(_dbName));
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < n; i++) {
+      String instance = "localhost_" + (12918 + i);
+      instances.add(instance);
+    }
 
-    monitor.updateExternalView(externalView, idealState);
+    ZNRecord idealStateRecord =
+        DefaultTwoStateStrategy.calculateIdealState(instances, _partitions, _replicas, _dbName,
+            "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(idealStateRecord);
+    ExternalView externalView = new ExternalView(idealStateRecord);
 
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
-    monitor.getBeanName();
+    monitor.updateResource(externalView, idealState);
 
-    int n = 4;
-    for (int i = 0; i < n; i++) {
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), 0);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
+    // monitor.getBeanName();
+
+    final int m = n - 1;
+    for (int i = 0; i < m; i++) {
       Map<String, String> map = externalView.getStateMap(_dbName + "_" + 3 * i);
       String key = map.keySet().toArray()[0].toString();
       map.put(key, "ERROR");
       externalView.setStateMap(_dbName + "_" + 3 * i, map);
     }
 
-    monitor.updateExternalView(externalView, idealState);
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), n);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+    monitor.updateResource(externalView, idealState);
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), 0);
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), m);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
 
-    n = 5;
     for (int i = 0; i < n; i++) {
       externalView.getRecord().getMapFields().remove(_dbName + "_" + 4 * i);
     }
 
-    monitor.updateExternalView(externalView, idealState);
-    AssertJUnit.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
-    AssertJUnit.assertEquals(monitor.getErrorPartitionGauge(), 3);
-    AssertJUnit.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
-    AssertJUnit.assertEquals(monitor.getPartitionGauge(), _partitions);
+    monitor.updateResource(externalView, idealState);
+    Assert.assertEquals(monitor.getDifferenceWithIdealStateGauge(), n * (_replicas + 1));
+    Assert.assertEquals(monitor.getErrorPartitionGauge(), 3);
+    Assert.assertEquals(monitor.getExternalViewPartitionGauge(), _partitions - n);
+    Assert.assertEquals(monitor.getPartitionGauge(), _partitions);
   }
 }