You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/01/15 07:28:23 UTC

[helix] branch wagedRebalancer updated: Add instance capacity gauge (#557)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 7dda679  Add instance capacity gauge (#557)
7dda679 is described below

commit 7dda679a89af8b98c449e025049f3537b7b0de11
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Tue Jan 14 23:28:08 2020 -0800

    Add instance capacity gauge (#557)
    
    We need to monitor instance utilization in purpose of understanding what the instance capacity is.
    
    Change list:
    - Change instance monitor to update capacity
    - Change getAttribute to throw AttributeNotFoundException in DynamicMBeanProvider
    - Combine max usage and instance capacity update into one method in cluster status monitor
    - Add unit test
---
 .../stages/CurrentStateComputationStage.java       |  13 ++-
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  31 +++----
 .../helix/monitoring/mbeans/InstanceMonitor.java   |  86 +++++++++++++-----
 .../mbeans/dynamicMBeans/DynamicMBeanProvider.java |  78 ++++++++--------
 .../mbeans/TestClusterStatusMonitor.java           | 100 +++++++++++++++++----
 .../mbeans/TestRoutingTableProviderMonitor.java    |  10 ++-
 .../monitoring/mbeans/TestZkClientMonitor.java     |   9 +-
 7 files changed, 224 insertions(+), 103 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 9ad8fcf..727804b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -21,7 +21,6 @@ package org.apache.helix.controller.stages;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -249,7 +248,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         // Only use the resources in ideal states to parse all replicas.
         Map<String, IdealState> idealStateMap = dataProvider.getIdealStates();
         Map<String, Resource> resourceToMonitorMap = resourceMap.entrySet().stream()
-            .filter(resourceName -> idealStateMap.containsKey(resourceName))
+            .filter(idealStateMap::containsKey)
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
         Map<String, ResourceAssignment> currentStateAssignment =
@@ -257,16 +256,16 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromExistingAssignment(
             dataProvider, resourceToMonitorMap, currentStateAssignment);
 
-        Map<String, Double> maxUsageMap = new HashMap<>();
         for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
           String instanceName = node.getInstanceName();
+          // There is no new usage adding to this node, so an empty map is passed in.
           double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
-          maxUsageMap.put(instanceName, usage);
+          clusterStatusMonitor
+              .updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
         }
-
-        clusterStatusMonitor.updateInstanceMaxUsage(maxUsageMap);
       } catch (Exception ex) {
-        LOG.error("Failed to report instance capacity metrics.", ex);
+        LOG.error("Failed to report instance capacity metrics. Exception message: {}",
+            ex.getMessage());
       }
 
       return null;
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 15fdbcd..96f85bf 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -367,24 +367,25 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   /**
-   * Update max capacity usage for per instance. Before calling this API, we assume the instance
-   * monitors are already registered in ReadClusterDataStage. If the monitor is not registered, this
-   * max usage update will fail.
+   * Updates instance capacity status for per instance, including max usage and capacity of each
+   * capacity key. Before calling this API, we assume the instance monitors are already registered
+   * in ReadClusterDataStage. If the monitor is not registered, this instance capacity status update
+   * will fail.
    *
-   * @param maxUsageMap a map of max capacity usage, {instance: maxCapacityUsage}
+   * @param instanceName This instance name
+   * @param maxUsage Max capacity usage of this instance
+   * @param capacityMap A map of this instance capacity, {capacity key: capacity value}
    */
-  public void updateInstanceMaxUsage(Map<String, Double> maxUsageMap) {
-    synchronized (_instanceMonitorMap) {
-      for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
-        InstanceMonitor monitor = _instanceMonitorMap.get(entry.getKey());
-        if (monitor == null) {
-          LOG.warn("Failed to update max usage because instance monitor is not found, instance: {}.",
-              entry.getKey());
-          continue;
-        }
-        monitor.updateMaxCapacityUsage(entry.getValue());
-      }
+  public void updateInstanceCapacityStatus(String instanceName, double maxUsage,
+      Map<String, Integer> capacityMap) {
+    InstanceMonitor monitor = _instanceMonitorMap.get(instanceName);
+    if (monitor == null) {
+      LOG.warn("Failed to update instance capacity status because instance monitor is not found, "
+          + "instance: {}.", instanceName);
+      return;
     }
+    monitor.updateMaxCapacityUsage(maxUsage);
+    monitor.updateCapacity(capacityMap);
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index b93d3b9..e0c0f89 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.management.JMException;
 import javax.management.ObjectName;
 
@@ -41,7 +42,7 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   /**
    * Metric names for instance capacity.
    */
-  public enum InstanceMonitorMetrics {
+  public enum InstanceMonitorMetric {
     // TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
     TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
     ENABLED_STATUS_GAUGE("Enabled"),
@@ -49,9 +50,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
     MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
 
-    private String metricName;
+    private final String metricName;
 
-    InstanceMonitorMetrics(String name) {
+    InstanceMonitorMetric(String name) {
       metricName = name;
     }
 
@@ -75,6 +76,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _onlineStatusGauge;
   private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
 
+  // A map of dynamic capacity Gauges. The map's keys could change.
+  private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;
+
   /**
    * Initialize the bean
    * @param clusterName the cluster to monitor
@@ -85,26 +89,41 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     _participantName = participantName;
     _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
     _initObjectName = objectName;
+    _dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
 
     createMetrics();
   }
 
   private void createMetrics() {
     _totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
-        InstanceMonitorMetrics.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
+        InstanceMonitorMetric.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
 
     _disabledPartitionsGauge =
-        new SimpleDynamicMetric<>(InstanceMonitorMetrics.DISABLED_PARTITIONS_GAUGE.metricName(),
+        new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
             0L);
     _enabledStatusGauge =
-        new SimpleDynamicMetric<>(InstanceMonitorMetrics.ENABLED_STATUS_GAUGE.metricName(), 0L);
+        new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
     _onlineStatusGauge =
-        new SimpleDynamicMetric<>(InstanceMonitorMetrics.ONLINE_STATUS_GAUGE.metricName(), 0L);
+        new SimpleDynamicMetric<>(InstanceMonitorMetric.ONLINE_STATUS_GAUGE.metricName(), 0L);
     _maxCapacityUsageGauge =
-        new SimpleDynamicMetric<>(InstanceMonitorMetrics.MAX_CAPACITY_USAGE_GAUGE.metricName(),
+        new SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
             0.0d);
   }
 
+  private List<DynamicMetric<?, ?>> buildAttributeList() {
+    List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
+        _totalMessagedReceivedCounter,
+        _disabledPartitionsGauge,
+        _enabledStatusGauge,
+        _onlineStatusGauge,
+        _maxCapacityUsageGauge
+    );
+
+    attributeList.addAll(_dynamicCapacityMetricsMap.values());
+
+    return attributeList;
+  }
+
   @Override
   public String getSensorName() {
     return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
@@ -183,7 +202,7 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   }
 
   /**
-   * Update max capacity usage for this instance.
+   * Updates max capacity usage for this instance.
    * @param maxUsage max capacity usage of this instance
    */
   public synchronized void updateMaxCapacityUsage(double maxUsage) {
@@ -191,25 +210,50 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   }
 
   /**
-   * Get max capacity usage of this instance.
+   * Gets max capacity usage of this instance.
    * @return Max capacity usage of this instance.
    */
   protected synchronized double getMaxCapacityUsageGauge() {
     return _maxCapacityUsageGauge.getValue();
   }
 
-  @Override
-  public DynamicMBeanProvider register()
-      throws JMException {
-    List<DynamicMetric<?, ?>> attributeList = ImmutableList.of(
-        _totalMessagedReceivedCounter,
-        _disabledPartitionsGauge,
-        _enabledStatusGauge,
-        _onlineStatusGauge,
-        _maxCapacityUsageGauge
-    );
+  /**
+   * Updates instance capacity metrics.
+   * @param capacity A map of instance capacity.
+   */
+  public void updateCapacity(Map<String, Integer> capacity) {
+    synchronized (_dynamicCapacityMetricsMap) {
+      // If capacity keys don't have any change, we just update the metric values.
+      if (_dynamicCapacityMetricsMap.keySet().equals(capacity.keySet())) {
+        for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
+          _dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
+        }
+        return;
+      }
 
-    doRegister(attributeList, _initObjectName);
+      // If capacity keys have any changes, we need to retain the capacity metrics.
+      // Make sure capacity metrics map has the same capacity keys.
+      // And update metrics values.
+      _dynamicCapacityMetricsMap.keySet().retainAll(capacity.keySet());
+      for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
+        String capacityName = entry.getKey();
+        if (_dynamicCapacityMetricsMap.containsKey(capacityName)) {
+          _dynamicCapacityMetricsMap.get(capacityName).updateValue((long) entry.getValue());
+        } else {
+          _dynamicCapacityMetricsMap.put(capacityName,
+              new SimpleDynamicMetric<>(capacityName + "Gauge", (long) entry.getValue()));
+        }
+      }
+    }
+
+    // Update MBean's all attributes.
+    updateAttributesInfo(buildAttributeList(),
+        "Instance monitor for instance: " + getInstanceName());
+  }
+
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    doRegister(buildAttributeList(), _initObjectName);
 
     return this;
   }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 0ce0b44..407a714 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -22,23 +22,19 @@ package org.apache.helix.monitoring.mbeans.dynamicMBeans;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.management.Attribute;
 import javax.management.AttributeList;
 import javax.management.AttributeNotFoundException;
 import javax.management.DynamicMBean;
-import javax.management.InvalidAttributeValueException;
 import javax.management.JMException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanConstructorInfo;
-import javax.management.MBeanException;
 import javax.management.MBeanInfo;
 import javax.management.MBeanNotificationInfo;
 import javax.management.MBeanOperationInfo;
 import javax.management.ObjectName;
-import javax.management.ReflectionException;
 
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.monitoring.SensorNameProvider;
@@ -53,12 +49,12 @@ import org.slf4j.LoggerFactory;
 public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNameProvider {
   protected final Logger _logger = LoggerFactory.getLogger(getClass());
   protected static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // Reset time every hour
-  private static String SENSOR_NAME_TAG = "SensorName";
-  private static String DEFAULT_DESCRIPTION =
+  private static final String SENSOR_NAME_TAG = "SensorName";
+  private static final String DEFAULT_DESCRIPTION =
       "Information on the management interface of the MBean";
 
   // Attribute name to the DynamicMetric object mapping
-  private final Map<String, DynamicMetric> _attributeMap = new HashMap<>();
+  private Map<String, DynamicMetric> _attributeMap = new HashMap<>();
   private ObjectName _objectName = null;
   private MBeanInfo _mBeanInfo;
 
@@ -88,7 +84,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
           objectName.getCanonicalName());
       return false;
     }
-    updateAttributtInfos(dynamicMetrics, description);
+    updateAttributesInfo(dynamicMetrics, description);
     _objectName = MBeanRegistrar.register(this, objectName);
     return true;
   }
@@ -99,26 +95,30 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
   }
 
   /**
-   * Update the Dynamic MBean provider with new metric list.
+   * Updates the Dynamic MBean provider with new metric list.
+   * If the pass-in metrics collection is empty, the original attributes will be removed.
+   *
    * @param description description of the MBean
-   * @param dynamicMetrics the DynamicMetrics
+   * @param dynamicMetrics the DynamicMetrics. Empty collection will remove the metric attributes.
    */
-  private void updateAttributtInfos(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+  protected void updateAttributesInfo(Collection<DynamicMetric<?, ?>> dynamicMetrics,
       String description) {
-    _attributeMap.clear();
+    if (dynamicMetrics == null) {
+      _logger.warn("Cannot update attributes info because dynamicMetrics is null.");
+      return;
+    }
 
-    // get all attributes that can be emit by the dynamicMetrics.
     List<MBeanAttributeInfo> attributeInfoList = new ArrayList<>();
-    if (dynamicMetrics != null) {
-      for (DynamicMetric dynamicMetric : dynamicMetrics) {
-        Iterator<MBeanAttributeInfo> iter = dynamicMetric.getAttributeInfos().iterator();
-        while (iter.hasNext()) {
-          MBeanAttributeInfo attributeInfo = iter.next();
-          // Info list to create MBean info
-          attributeInfoList.add(attributeInfo);
-          // Attribute mapping for getting attribute value when getAttribute() is called
-          _attributeMap.put(attributeInfo.getName(), dynamicMetric);
-        }
+    // Use a new attribute map to avoid concurrency issue.
+    Map<String, DynamicMetric> newAttributeMap = new HashMap<>();
+
+    // Get all attributes that can be emitted by the dynamicMetrics.
+    for (DynamicMetric<?, ?> dynamicMetric : dynamicMetrics) {
+      for (MBeanAttributeInfo attributeInfo : dynamicMetric.getAttributeInfos()) {
+        // Info list to create MBean info
+        attributeInfoList.add(attributeInfo);
+        // Attribute mapping for getting attribute value when getAttribute() is called
+        newAttributeMap.put(attributeInfo.getName(), dynamicMetric);
       }
     }
 
@@ -130,17 +130,19 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
         String.format("Default %s Constructor", getClass().getSimpleName()),
         getClass().getConstructors()[0]);
 
-    MBeanAttributeInfo[] attributeInfos = new MBeanAttributeInfo[attributeInfoList.size()];
-    attributeInfos = attributeInfoList.toArray(attributeInfos);
+    MBeanAttributeInfo[] attributesInfo = new MBeanAttributeInfo[attributeInfoList.size()];
+    attributesInfo = attributeInfoList.toArray(attributesInfo);
 
     if (description == null) {
       description = DEFAULT_DESCRIPTION;
     }
 
-    _mBeanInfo = new MBeanInfo(getClass().getName(), description, attributeInfos,
-        new MBeanConstructorInfo[] {
-            constructorInfo
-        }, new MBeanOperationInfo[0], new MBeanNotificationInfo[0]);
+    _mBeanInfo = new MBeanInfo(getClass().getName(), description, attributesInfo,
+        new MBeanConstructorInfo[]{constructorInfo}, new MBeanOperationInfo[0],
+        new MBeanNotificationInfo[0]);
+
+    // Update _attributeMap reference.
+    _attributeMap = newAttributeMap;
   }
 
   /**
@@ -158,17 +160,17 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
   }
 
   @Override
-  public Object getAttribute(String attribute)
-      throws AttributeNotFoundException, MBeanException, ReflectionException {
+  public Object getAttribute(String attribute) throws AttributeNotFoundException {
     if (SENSOR_NAME_TAG.equals(attribute)) {
       return getSensorName();
     }
 
-    if (!_attributeMap.containsKey(attribute)) {
-      return null;
+    DynamicMetric metric = _attributeMap.get(attribute);
+    if (metric == null) {
+      throw new AttributeNotFoundException("Attribute[" + attribute + "] is not found.");
     }
 
-    return _attributeMap.get(attribute).getAttributeValue(attribute);
+    return metric.getAttributeValue(attribute);
   }
 
   @Override
@@ -178,7 +180,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
       try {
         Object value = getAttribute(attributeName);
         attributeList.add(new Attribute(attributeName, value));
-      } catch (AttributeNotFoundException | MBeanException | ReflectionException ex) {
+      } catch (AttributeNotFoundException ex) {
         _logger.error("Failed to get attribute: " + attributeName, ex);
       }
     }
@@ -191,8 +193,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
   }
 
   @Override
-  public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
-      InvalidAttributeValueException, MBeanException, ReflectionException {
+  public void setAttribute(Attribute attribute) {
     // All MBeans are readonly
     return;
   }
@@ -204,8 +205,7 @@ public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNamePr
   }
 
   @Override
-  public Object invoke(String actionName, Object[] params, String[] signature)
-      throws MBeanException, ReflectionException {
+  public Object invoke(String actionName, Object[] params, String[] signature) {
     // No operation supported
     return null;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index a9e6c24..f4ba01f 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -39,6 +39,7 @@ import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
@@ -62,8 +63,7 @@ public class TestClusterStatusMonitor {
   private String testDB_0 = testDB + "_0";
 
   @Test()
-  public void testReportData()
-      throws Exception {
+  public void testReportData() throws Exception {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -166,8 +166,7 @@ public class TestClusterStatusMonitor {
 
 
   @Test
-  public void testResourceAggregation()
-      throws JMException, IOException {
+  public void testResourceAggregation() throws JMException, IOException {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -315,14 +314,20 @@ public class TestClusterStatusMonitor {
   }
 
   @Test
-  public void testUpdateMaxCapacityUsage()
+  public void testUpdateInstanceCapacityStatus()
       throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,
              ReflectionException, InstanceNotFoundException {
     String clusterName = "testCluster";
     List<Double> maxUsageList = ImmutableList.of(0.0d, 0.32d, 0.85d, 1.0d, 0.50d, 0.75d);
     Map<String, Double> maxUsageMap = new HashMap<>();
+    Map<String, Map<String, Integer>> instanceCapacityMap = new HashMap<>();
+    Random rand = new Random();
+
     for (int i = 0; i < maxUsageList.size(); i++) {
-      maxUsageMap.put("instance" + i, maxUsageList.get(i));
+      String instanceName = "instance" + i;
+      maxUsageMap.put(instanceName, maxUsageList.get(i));
+      instanceCapacityMap.put(instanceName,
+          ImmutableMap.of("capacity1", rand.nextInt(100), "capacity2", rand.nextInt(100)));
     }
 
     // Setup cluster status monitor.
@@ -330,13 +335,15 @@ public class TestClusterStatusMonitor {
     monitor.active();
     ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
 
+    // Cluster status monitor is registered.
     Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
 
     // Before calling setClusterInstanceStatus, instance monitors are not yet registered.
     for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
       String instance = entry.getKey();
-      String instanceBeanName =
-          String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+      String instanceBeanName = String
+          .format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY,
+              instance);
       ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
 
       Assert.assertFalse(_server.isRegistered(instanceObjectName));
@@ -346,31 +353,86 @@ public class TestClusterStatusMonitor {
     monitor.setClusterInstanceStatus(maxUsageMap.keySet(), maxUsageMap.keySet(),
         Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
         Collections.emptyMap());
-    // Update max usage stats.
-    monitor.updateInstanceMaxUsage(maxUsageMap);
 
-    // Verify results.
-    for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
-      String instance = entry.getKey();
-      double usage = entry.getValue();
-      String instanceBeanName =
-          String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+    // Update instance capacity status.
+    for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
+      String instanceName = usageEntry.getKey();
+      monitor.updateInstanceCapacityStatus(instanceName, usageEntry.getValue(),
+          instanceCapacityMap.get(instanceName));
+    }
+
+    verifyCapacityMetrics(monitor, maxUsageMap, instanceCapacityMap);
+
+    // Change capacity keys: "capacity2" -> "capacity3"
+    for (String instanceName : instanceCapacityMap.keySet()) {
+      instanceCapacityMap.put(instanceName,
+          ImmutableMap.of("capacity1", rand.nextInt(100), "capacity3", rand.nextInt(100)));
+    }
+
+    // Update instance capacity status.
+    for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
+      String instanceName = usageEntry.getKey();
+      monitor.updateInstanceCapacityStatus(instanceName, usageEntry.getValue(),
+          instanceCapacityMap.get(instanceName));
+    }
+
+    // "capacity2" metric should not exist in MBean server.
+    String removedAttribute = "capacity2Gauge";
+    for (Map.Entry<String, Map<String, Integer>> instanceEntry : instanceCapacityMap.entrySet()) {
+      String instance = instanceEntry.getKey();
+      String instanceBeanName = String
+          .format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY,
+              instance);
       ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
 
-      Assert.assertTrue(_server.isRegistered(instanceObjectName));
-      Assert.assertEquals(_server.getAttribute(instanceObjectName, "MaxCapacityUsageGauge"), usage);
+      try {
+        _server.getAttribute(instanceObjectName, removedAttribute);
+        Assert.fail();
+      } catch (AttributeNotFoundException ex) {
+        // Expected AttributeNotFoundException because "capacity2Gauge" metric does not exist in
+        // MBean server.
+      }
     }
 
+    verifyCapacityMetrics(monitor, maxUsageMap, instanceCapacityMap);
+
     // Reset monitor.
     monitor.reset();
     Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
         "Failed to unregister ClusterStatusMonitor.");
     for (String instance : maxUsageMap.keySet()) {
       String instanceBeanName =
-          String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+          String.format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY, instance);
       ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
       Assert.assertFalse(_server.isRegistered(instanceObjectName),
           "Failed to unregister instance monitor for instance: " + instance);
     }
   }
+
+  private void verifyCapacityMetrics(ClusterStatusMonitor monitor, Map<String, Double> maxUsageMap,
+      Map<String, Map<String, Integer>> instanceCapacityMap)
+      throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,
+             ReflectionException, InstanceNotFoundException {
+    // Verify results.
+    for (Map.Entry<String, Map<String, Integer>> instanceEntry : instanceCapacityMap.entrySet()) {
+      String instance = instanceEntry.getKey();
+      Map<String, Integer> capacityMap = instanceEntry.getValue();
+      String instanceBeanName = String
+          .format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY,
+              instance);
+      ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+      Assert.assertTrue(_server.isRegistered(instanceObjectName));
+      Assert.assertEquals(_server.getAttribute(instanceObjectName,
+          InstanceMonitor.InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName()),
+          maxUsageMap.get(instance));
+
+      for (Map.Entry<String, Integer> capacityEntry : capacityMap.entrySet()) {
+        String capacityKey = capacityEntry.getKey();
+        String attributeName = capacityKey + "Gauge";
+        Assert.assertEquals((long) _server.getAttribute(instanceObjectName, attributeName),
+            (long) instanceCapacityMap.get(instance).get(capacityKey));
+      }
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
index f2b7631..5119d81 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
@@ -3,6 +3,7 @@ package org.apache.helix.monitoring.mbeans;
 import java.lang.management.ManagementFactory;
 import java.util.HashSet;
 import java.util.Set;
+import javax.management.AttributeNotFoundException;
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -82,8 +83,15 @@ public class TestRoutingTableProviderMonitor {
     Assert.assertEquals((long) _beanServer.getAttribute(name, "EventQueueSizeGauge"), 15);
     Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max"), 0);
     Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 0);
+
     // StatePropagationLatencyGauge only apply for current state
-    Assert.assertEquals(_beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"), null);
+    try {
+      _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max");
+      Assert.fail();
+    } catch (AttributeNotFoundException ex) {
+      // Expected AttributeNotFoundException because the metric does not exist in
+      // MBean server.
+    }
 
     long startTime = System.currentTimeMillis();
     Thread.sleep(5);
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 22be0a5..2695ee4 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -20,6 +20,7 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import java.lang.management.ManagementFactory;
+import javax.management.AttributeNotFoundException;
 import javax.management.JMException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -117,7 +118,13 @@ public class TestZkClientMonitor {
     requestGauge = (long) _beanServer.getAttribute(name, "OutstandingRequestGauge");
     Assert.assertEquals(requestGauge, 0);
 
-    Assert.assertNull(_beanServer.getAttribute(name, "PendingCallbackGauge"));
+    try {
+      _beanServer.getAttribute(name, "PendingCallbackGauge");
+      Assert.fail();
+    } catch (AttributeNotFoundException ex) {
+      // Expected AttributeNotFoundException because the metric does not exist in
+      // MBean server.
+    }
 
     monitor.record("TEST/IDEALSTATES/myResource", 0, System.currentTimeMillis() - 10,
         ZkClientMonitor.AccessType.READ);