You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2019/11/02 00:54:17 UTC

[helix] branch wagedRebalancer updated: Add max capacity usage metric for instance monitor. (#548)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 1d01466  Add max capacity usage metric for instance monitor. (#548)
1d01466 is described below

commit 1d014669321154a646a6fb9c60009a357742933f
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Fri Nov 1 17:54:11 2019 -0700

    Add max capacity usage metric for instance monitor. (#548)
    
    We need to monitor instance's max utilization in purpose of understanding what the max capacity usage is and knowing the status of the instance.
    
    Change list:
    1. Change instance monitor to extend dynamic metric, and change code logic in ClusterStatusMonitor to adapt the InstanceMonitor changes.
    2. Add APIs for get/update MaxCapacityUsage.
    3. Add an API in cluster status monitor to update max capacity usage.
    4. Add unit tests for instance monitor and updateing max capacity usage.
---
 .../rebalancer/waged/WagedRebalancer.java          |  22 +--
 .../waged/model/ClusterModelProvider.java          |  17 +++
 .../stages/CurrentStateComputationStage.java       |  42 ++++++
 .../controller/stages/CurrentStateOutput.java      |  24 ++++
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  72 ++++++----
 .../helix/monitoring/mbeans/InstanceMonitor.java   | 149 +++++++++++++++------
 .../monitoring/mbeans/InstanceMonitorMBean.java    |  51 -------
 .../mbeans/TestClusterStatusMonitor.java           | 136 +++++++++++++------
 .../monitoring/mbeans/TestInstanceMonitor.java     |  76 +++++++++++
 9 files changed, 418 insertions(+), 171 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index a88f809..605dcd1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -540,7 +540,7 @@ public class WagedRebalancer {
     }
     if (currentBaseline.isEmpty()) {
       LOG.warn("The current baseline assignment record is empty. Use the current states instead.");
-      currentBaseline = getCurrentStateAssingment(currentStateOutput, resources);
+      currentBaseline = currentStateOutput.getAssignment(resources);
     }
     currentBaseline.keySet().retainAll(resources);
     return currentBaseline;
@@ -575,30 +575,12 @@ public class WagedRebalancer {
     if (currentBestAssignment.isEmpty()) {
       LOG.warn(
           "The current best possible assignment record is empty. Use the current states instead.");
-      currentBestAssignment = getCurrentStateAssingment(currentStateOutput, resources);
+      currentBestAssignment = currentStateOutput.getAssignment(resources);
     }
     currentBestAssignment.keySet().retainAll(resources);
     return currentBestAssignment;
   }
 
-  private Map<String, ResourceAssignment> getCurrentStateAssingment(
-      CurrentStateOutput currentStateOutput, Set<String> resourceSet) {
-    Map<String, ResourceAssignment> currentStateAssignment = new HashMap<>();
-    for (String resourceName : resourceSet) {
-      Map<Partition, Map<String, String>> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName);
-      if (!currentStateMap.isEmpty()) {
-        ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
-        currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
-          newResourceAssignment.addReplicaMap(currentStateEntry.getKey(),
-              currentStateEntry.getValue());
-        });
-        currentStateAssignment.put(resourceName, newResourceAssignment);
-      }
-    }
-    return currentStateAssignment;
-  }
-
   /**
    * Schedule rebalance according to the delayed rebalance logic.
    * @param clusterData the current cluster data cache
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 4722e7d..f777534 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -94,6 +94,23 @@ public class ClusterModelProvider {
   }
 
   /**
+   * Generate a cluster model based on the current state output and data cache.
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be rebalanced. Note that any
+   *                               resources that are not in this list will be removed from the
+   *                               final assignment.
+   * @param currentStateAssignment The resource assignment built from current state output.
+   * @return A cluster model based on the current state and data cache.
+   */
+  public static ClusterModel generateClusterModelFromCurrentState(
+      ResourceControllerDataProvider dataProvider,
+      Map<String, Resource> resourceMap,
+      Map<String, ResourceAssignment> currentStateAssignment) {
+    return generateClusterModel(dataProvider, resourceMap, dataProvider.getEnabledLiveInstances(),
+        Collections.emptyMap(), Collections.emptyMap(), currentStateAssignment);
+  }
+
+  /**
    * Find the minimum set of replicas that need to be reassigned.
    * A replica needs to be reassigned if one of the following condition is true:
    * 1. Cluster topology (the cluster config / any instance config) has been updated.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index d5c65cd..38c2261 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -20,14 +20,21 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +52,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     _eventId = event.getEventId();
     BaseControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
     final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+    final Map<String, Resource> resourceToRebalance =
+        event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
 
     if (cache == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -69,6 +78,13 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
     }
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+
+    final ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    if (clusterStatusMonitor != null && cache instanceof ResourceControllerDataProvider) {
+      reportInstanceCapacityMetrics(clusterStatusMonitor, (ResourceControllerDataProvider) cache,
+          resourceToRebalance, currentStateOutput);
+    }
   }
 
   // update all pending messages to CurrentStateOutput.
@@ -215,4 +231,30 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
     }
   }
+
+  private void reportInstanceCapacityMetrics(ClusterStatusMonitor clusterStatusMonitor,
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
+      CurrentStateOutput currentStateOutput) {
+    asyncExecute(dataProvider.getAsyncTasksThreadPool(), () -> {
+      try {
+        Map<String, ResourceAssignment> currentStateAssignment =
+            currentStateOutput.getAssignment(resourceMap.keySet());
+        ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromCurrentState(
+            dataProvider, resourceMap, currentStateAssignment);
+
+        Map<String, Double> maxUsageMap = new HashMap<>();
+        for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
+          String instanceName = node.getInstanceName();
+          double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
+          maxUsageMap.put(instanceName, usage);
+        }
+
+        clusterStatusMonitor.updateInstanceMaxUsage(maxUsageMap);
+      } catch (Exception ex) {
+        LOG.error("Failed to report instance capacity metrics.", ex);
+      }
+
+      return null;
+    });
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 13e1dbf..87dcae3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -28,6 +28,8 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 
 import com.google.common.collect.Sets;
+import org.apache.helix.model.ResourceAssignment;
+
 
 /**
  * The current state includes both current state and pending messages
@@ -428,4 +430,26 @@ public class CurrentStateOutput {
     return sb.toString();
   }
 
+  /**
+   * Get current state assignment for a set of resources.
+   * @param resourceSet a set of resources' names
+   * @return a map of current state resource assignment, {resourceName: resourceAssignment}
+   */
+  public Map<String, ResourceAssignment> getAssignment(Set<String> resourceSet) {
+    Map<String, ResourceAssignment> currentStateAssignment = new HashMap<>();
+    for (String resourceName : resourceSet) {
+      Map<Partition, Map<String, String>> currentStateMap =
+          getCurrentStateMap(resourceName);
+      if (!currentStateMap.isEmpty()) {
+        ResourceAssignment newResourceAssignment = new ResourceAssignment(resourceName);
+        currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
+          newResourceAssignment.addReplicaMap(currentStateEntry.getKey(),
+              currentStateEntry.getValue());
+        });
+        currentStateAssignment.put(resourceName, newResourceAssignment);
+      }
+    }
+
+    return currentStateAssignment;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 5e8c17a..97d0a96 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -236,27 +236,28 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       // Unregister beans for instances that are no longer configured
       Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
       toUnregister.removeAll(instanceSet);
-      try {
-        unregisterInstances(toUnregister);
-      } catch (MalformedObjectNameException e) {
-        LOG.error("Could not unregister instances from MBean server: " + toUnregister, e);
-      }
+      unregisterInstances(toUnregister);
 
       // Register beans for instances that are newly configured
       Set<String> toRegister = Sets.newHashSet(instanceSet);
       toRegister.removeAll(_instanceMonitorMap.keySet());
       Set<InstanceMonitor> monitorsToRegister = Sets.newHashSet();
       for (String instanceName : toRegister) {
-        InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName);
-        bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
-            oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
-            !disabledInstanceSet.contains(instanceName));
-        monitorsToRegister.add(bean);
+        try {
+          ObjectName objectName = getObjectName(getInstanceBeanName(instanceName));
+          InstanceMonitor bean = new InstanceMonitor(_clusterName, instanceName, objectName);
+          bean.updateInstance(tags.get(instanceName), disabledPartitions.get(instanceName),
+              oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
+              !disabledInstanceSet.contains(instanceName));
+          monitorsToRegister.add(bean);
+        } catch (MalformedObjectNameException ex) {
+          LOG.error("Failed to create instance monitor for instance: {}.", instanceName);
+        }
       }
       try {
         registerInstances(monitorsToRegister);
-      } catch (MalformedObjectNameException e) {
-        LOG.error("Could not register instances with MBean server: " + toRegister, e);
+      } catch (JMException e) {
+        LOG.error("Could not register instances with MBean server: {}.", toRegister, e);
       }
 
       // Update all the sets
@@ -282,8 +283,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
             try {
               unregisterInstances(Arrays.asList(instanceName));
               registerInstances(Arrays.asList(bean));
-            } catch (MalformedObjectNameException e) {
-              LOG.error("Could not refresh registration with MBean server: " + instanceName, e);
+            } catch (JMException e) {
+              LOG.error("Could not refresh registration with MBean server: {}", instanceName, e);
             }
           }
         }
@@ -366,6 +367,27 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   /**
+   * Update max capacity usage for per instance. Before calling this API, we assume the instance
+   * monitors are already registered in ReadClusterDataStage. If the monitor is not registered, this
+   * max usage update will fail.
+   *
+   * @param maxUsageMap a map of max capacity usage, {instance: maxCapacityUsage}
+   */
+  public void updateInstanceMaxUsage(Map<String, Double> maxUsageMap) {
+    synchronized (_instanceMonitorMap) {
+      for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
+        InstanceMonitor monitor = _instanceMonitorMap.get(entry.getKey());
+        if (monitor == null) {
+          LOG.warn("Failed to update max usage because instance monitor is not found, instance: {}.",
+              entry.getKey());
+          continue;
+        }
+        monitor.updateMaxCapacityUsage(entry.getValue());
+      }
+    }
+  }
+
+  /**
    * Update gauges for resource at instance level
    * @param bestPossibleStates
    * @param resourceMap
@@ -694,31 +716,35 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   private void registerInstances(Collection<InstanceMonitor> instances)
-      throws MalformedObjectNameException {
+      throws JMException {
     synchronized (_instanceMonitorMap) {
       for (InstanceMonitor monitor : instances) {
         String instanceName = monitor.getInstanceName();
-        String beanName = getInstanceBeanName(instanceName);
-        register(monitor, getObjectName(beanName));
+        // If this instance MBean is already registered, unregister it.
+        InstanceMonitor removedMonitor = _instanceMonitorMap.remove(instanceName);
+        if (removedMonitor != null) {
+          removedMonitor.unregister();
+        }
+        monitor.register();
         _instanceMonitorMap.put(instanceName, monitor);
       }
     }
   }
 
-  private void unregisterAllInstances() throws MalformedObjectNameException {
+  private void unregisterAllInstances() {
     synchronized (_instanceMonitorMap) {
       unregisterInstances(_instanceMonitorMap.keySet());
     }
   }
 
-  private void unregisterInstances(Collection<String> instances)
-      throws MalformedObjectNameException {
+  private void unregisterInstances(Collection<String> instances) {
     synchronized (_instanceMonitorMap) {
       for (String instanceName : instances) {
-        String beanName = getInstanceBeanName(instanceName);
-        unregister(getObjectName(beanName));
+        InstanceMonitor monitor = _instanceMonitorMap.remove(instanceName);
+        if (monitor != null) {
+          monitor.unregister();
+        }
       }
-      _instanceMonitorMap.keySet().removeAll(instances);
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index dc43d48..eea6eaf 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -24,35 +24,86 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import javax.management.JMException;
+import javax.management.ObjectName;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
 
 /**
  * Implementation of the instance status bean
  */
-public class InstanceMonitor implements InstanceMonitorMBean {
+public class InstanceMonitor extends DynamicMBeanProvider {
+  /**
+   * Metric names for instance capacity.
+   */
+  public enum InstanceMonitorMetrics {
+    // TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
+    TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
+    ENABLED_STATUS_GAUGE("Enabled"),
+    ONLINE_STATUS_GAUGE("Online"),
+    DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
+    MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
+
+    private String metricName;
+
+    InstanceMonitorMetrics(String name) {
+      metricName = name;
+    }
+
+    public String metricName() {
+      return metricName;
+    }
+  }
+
   private final String _clusterName;
   private final String _participantName;
+  private final ObjectName _initObjectName;
+
   private List<String> _tags;
-  private long _disabledPartitions;
-  private boolean _isUp;
-  private boolean _isEnabled;
-  private long _totalMessageReceived;
+
+  // Counters
+  private SimpleDynamicMetric<Long> _totalMessagedReceivedCounter;
+
+  // Gauges
+  private SimpleDynamicMetric<Long> _enabledStatusGauge;
+  private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
+  private SimpleDynamicMetric<Long> _onlineStatusGauge;
+  private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
 
   /**
    * Initialize the bean
    * @param clusterName the cluster to monitor
    * @param participantName the instance whose statistics this holds
    */
-  public InstanceMonitor(String clusterName, String participantName) {
+  public InstanceMonitor(String clusterName, String participantName, ObjectName objectName) {
     _clusterName = clusterName;
     _participantName = participantName;
     _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
-    _disabledPartitions = 0L;
-    _isUp = false;
-    _isEnabled = false;
-    _totalMessageReceived = 0;
+    _initObjectName = objectName;
+
+    createMetrics();
+  }
+
+  private void createMetrics() {
+    _totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
+        InstanceMonitorMetrics.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
+
+    _disabledPartitionsGauge =
+        new SimpleDynamicMetric<>(InstanceMonitorMetrics.DISABLED_PARTITIONS_GAUGE.metricName(),
+            0L);
+    _enabledStatusGauge =
+        new SimpleDynamicMetric<>(InstanceMonitorMetrics.ENABLED_STATUS_GAUGE.metricName(), 0L);
+    _onlineStatusGauge =
+        new SimpleDynamicMetric<>(InstanceMonitorMetrics.ONLINE_STATUS_GAUGE.metricName(), 0L);
+    _maxCapacityUsageGauge =
+        new SimpleDynamicMetric<>(InstanceMonitorMetrics.MAX_CAPACITY_USAGE_GAUGE.metricName(),
+            0.0d);
   }
 
   @Override
@@ -61,44 +112,32 @@ public class InstanceMonitor implements InstanceMonitorMBean {
         serializedTags(), _participantName);
   }
 
-  @Override
-  public long getOnline() {
-    return _isUp ? 1 : 0;
-  }
-
-  @Override
-  public long getEnabled() {
-    return _isEnabled ? 1 : 0;
+  protected long getOnline() {
+    return _onlineStatusGauge.getValue();
   }
 
-  @Override
-  public long getTotalMessageReceived() {
-    return _totalMessageReceived;
+  protected long getEnabled() {
+    return _enabledStatusGauge.getValue();
   }
 
-  @Override
-  public long getDisabledPartitions() {
-    return _disabledPartitions;
+  protected long getTotalMessageReceived() {
+    return _totalMessagedReceivedCounter.getValue();
   }
 
-  /**
-   * Get all the tags currently on this instance
-   * @return list of tags
-   */
-  public List<String> getTags() {
-    return _tags;
+  protected long getDisabledPartitions() {
+    return _disabledPartitionsGauge.getValue();
   }
 
   /**
    * Get the name of the monitored instance
    * @return instance name as a string
    */
-  public String getInstanceName() {
+  protected String getInstanceName() {
     return _participantName;
   }
 
   private String serializedTags() {
-    return Joiner.on('|').skipNulls().join(_tags).toString();
+    return Joiner.on('|').skipNulls().join(_tags);
   }
 
   /**
@@ -117,20 +156,22 @@ public class InstanceMonitor implements InstanceMonitorMBean {
       _tags = Lists.newArrayList(tags);
       Collections.sort(_tags);
     }
-    _disabledPartitions = 0L;
+    long numDisabledPartitions = 0L;
     if (disabledPartitions != null) {
       for (List<String> partitions : disabledPartitions.values()) {
         if (partitions != null) {
-          _disabledPartitions += partitions.size();
+          numDisabledPartitions += partitions.size();
         }
       }
     }
     // TODO : Get rid of this when old API removed.
     if (oldDisabledPartitions != null) {
-      _disabledPartitions += oldDisabledPartitions.size();
+      numDisabledPartitions += oldDisabledPartitions.size();
     }
-    _isUp = isLive;
-    _isEnabled = isEnabled;
+
+    _onlineStatusGauge.updateValue(isLive ? 1L : 0L);
+    _enabledStatusGauge.updateValue(isEnabled ? 1L : 0L);
+    _disabledPartitionsGauge.updateValue(numDisabledPartitions);
   }
 
   /**
@@ -138,7 +179,39 @@ public class InstanceMonitor implements InstanceMonitorMBean {
    * @param messageReceived received message numbers
    */
   public synchronized void increaseMessageCount(long messageReceived) {
-    _totalMessageReceived += messageReceived;
+    _totalMessagedReceivedCounter
+        .updateValue(_totalMessagedReceivedCounter.getValue() + messageReceived);
   }
 
+  /**
+   * Update max capacity usage for this instance.
+   * @param maxUsage max capacity usage of this instance
+   */
+  public synchronized void updateMaxCapacityUsage(double maxUsage) {
+    _maxCapacityUsageGauge.updateValue(maxUsage);
+  }
+
+  /**
+   * Get max capacity usage of this instance.
+   * @return Max capacity usage of this instance.
+   */
+  protected synchronized double getMaxCapacityUsageGauge() {
+    return _maxCapacityUsageGauge.getValue();
+  }
+
+  @Override
+  public DynamicMBeanProvider register()
+      throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = ImmutableList.of(
+        _totalMessagedReceivedCounter,
+        _disabledPartitionsGauge,
+        _enabledStatusGauge,
+        _onlineStatusGauge,
+        _maxCapacityUsageGauge
+    );
+
+    doRegister(attributeList, _initObjectName);
+
+    return this;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
deleted file mode 100644
index a3221d8..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * A basic bean describing the status of a single instance
- */
-public interface InstanceMonitorMBean extends SensorNameProvider {
-  /**
-   * Check if this instance is live
-   * @return 1 if running, 0 otherwise
-   */
-  public long getOnline();
-
-  /**
-   * Check if this instance is enabled
-   * @return 1 if enabled, 0 if disabled
-   */
-  public long getEnabled();
-
-  /**
-   * Get total message received for this instances
-   * @return The total number of messages sent to this instance
-   */
-  public long getTotalMessageReceived();
-
-  /**
-   * Get the total disabled partitions number for this instance
-   * @return The total number of disabled partitions
-   */
-  public long getDisabledPartitions();
-}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index b2daba6..cdd8688 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -19,18 +19,27 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import com.beust.jcommander.internal.Maps;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import javax.management.AttributeNotFoundException;
 import javax.management.InstanceNotFoundException;
 import javax.management.JMException;
+import javax.management.MBeanException;
 import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
@@ -47,13 +56,15 @@ import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+
 public class TestClusterStatusMonitor {
   private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
-  String testDB = "TestDB";
-  String testDB_0 = testDB + "_0";
+  private String testDB = "TestDB";
+  private String testDB_0 = testDB + "_0";
 
   @Test()
-  public void testReportData() throws Exception {
+  public void testReportData()
+      throws Exception {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -64,11 +75,8 @@ public class TestClusterStatusMonitor {
     ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
     monitor.active();
     ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
-    try {
-      _server.getMBeanInfo(clusterMonitorObjName);
-    } catch (Exception e) {
-      Assert.fail("Fail to register ClusterStatusMonitor");
-    }
+
+    Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
 
     // Test #setPerInstanceResourceStatus()
     BestPossibleStateOutput bestPossibleStates = new BestPossibleStateOutput();
@@ -137,42 +145,30 @@ public class TestClusterStatusMonitor {
         "localhost_12918");
     monitor.setPerInstanceResourceStatus(bestPossibleStates, instanceConfigMap, resourceMap,
         stateModelDefMap);
-    try {
-      objName =
-          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
-      _server.getMBeanInfo(objName);
-      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12918");
-
-    } catch (InstanceNotFoundException e) {
-      // OK
-    }
+
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12918", testDB));
+    Assert.assertFalse(_server.isRegistered(objName),
+        "Fail to unregister PerInstanceResource mbean for localhost_12918");
 
     // Clean up
     monitor.reset();
 
-    try {
-      objName =
-          monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
-      _server.getMBeanInfo(objName);
-      Assert.fail("Fail to unregister PerInstanceResource mbean for localhost_12920");
-
-    } catch (InstanceNotFoundException e) {
-      // OK
-    }
+    objName =
+        monitor.getObjectName(monitor.getPerInstanceResourceBeanName("localhost_12920", testDB));
+    Assert.assertFalse(_server.isRegistered(objName),
+        "Fail to unregister PerInstanceResource mbean for localhost_12920");
 
-    try {
-      _server.getMBeanInfo(clusterMonitorObjName);
-      Assert.fail("Fail to unregister ClusterStatusMonitor");
-    } catch (InstanceNotFoundException e) {
-      // OK
-    }
+    Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
+        "Failed to unregister ClusterStatusMonitor.");
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
 
   @Test
-  public void testResourceAggregation() throws JMException {
+  public void testResourceAggregation()
+      throws JMException, IOException {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -182,11 +178,8 @@ public class TestClusterStatusMonitor {
     ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
     monitor.active();
     ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
-    try {
-      _server.getMBeanInfo(clusterMonitorObjName);
-    } catch (Exception e) {
-      Assert.fail("Fail to register ClusterStatusMonitor");
-    }
+
+    Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
 
     int numInstance = 5;
     int numPartition = 10;
@@ -315,5 +308,70 @@ public class TestClusterStatusMonitor {
     messageCount = new Random().nextInt(numPartition) + 1;
     monitor.setResourceStatus(externalView, idealState, stateModelDef, messageCount);
     Assert.assertEquals(monitor.getPendingStateTransitionGuage(), messageCount);
+
+    // Reset monitor.
+    monitor.reset();
+    Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
+        "Failed to unregister ClusterStatusMonitor.");
+  }
+
+  @Test
+  public void testUpdateMaxCapacityUsage()
+      throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,
+             ReflectionException, InstanceNotFoundException {
+    String clusterName = "testCluster";
+    List<Double> maxUsageList = ImmutableList.of(0.0d, 0.32d, 0.85d, 1.0d, 0.50d, 0.75d);
+    Map<String, Double> maxUsageMap = new HashMap<>();
+    for (int i = 0; i < maxUsageList.size(); i++) {
+      maxUsageMap.put("instance" + i, maxUsageList.get(i));
+    }
+
+    // Setup cluster status monitor.
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    monitor.active();
+    ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+
+    Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+    // Before calling setClusterInstanceStatus, instance monitors are not yet registered.
+    for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
+      String instance = entry.getKey();
+      String instanceBeanName =
+          String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+      ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+      Assert.assertFalse(_server.isRegistered(instanceObjectName));
+    }
+
+    // Call setClusterInstanceStatus to register instance monitors.
+    monitor.setClusterInstanceStatus(maxUsageMap.keySet(), maxUsageMap.keySet(),
+        Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
+        Collections.emptyMap());
+    // Update max usage stats.
+    monitor.updateInstanceMaxUsage(maxUsageMap);
+
+    // Verify results.
+    for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
+      String instance = entry.getKey();
+      double usage = entry.getValue();
+      String instanceBeanName =
+          String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+      ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+      Assert.assertTrue(_server.isRegistered(instanceObjectName));
+      Assert.assertEquals(_server.getAttribute(instanceObjectName, "MaxCapacityUsageGauge"), usage);
+    }
+
+    // Reset monitor.
+    monitor.reset();
+    Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
+        "Failed to unregister ClusterStatusMonitor.");
+    for (String instance : maxUsageMap.keySet()) {
+      String instanceBeanName =
+          String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+      ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+      Assert.assertFalse(_server.isRegistered(instanceObjectName),
+          "Failed to unregister instance monitor for instance: " + instance);
+    }
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
new file mode 100644
index 0000000..709c5f7
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
@@ -0,0 +1,76 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.management.JMException;
+import javax.management.ObjectName;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestInstanceMonitor {
+  @Test
+  public void testInstanceMonitor()
+      throws JMException {
+    String testCluster = "testCluster";
+    String testInstance = "testInstance";
+    String testDomain = "testDomain:key=value";
+    Set<String> tags = ImmutableSet.of("test", "DEFAULT");
+    Map<String, List<String>> disabledPartitions =
+        ImmutableMap.of("instance1", ImmutableList.of("partition1", "partition2"));
+    InstanceMonitor monitor =
+        new InstanceMonitor(testCluster, testInstance, new ObjectName(testDomain));
+
+    // Verify init status.
+    Assert.assertEquals(monitor.getSensorName(),
+        "ParticipantStatus.testCluster.DEFAULT.testInstance");
+    Assert.assertEquals(monitor.getInstanceName(), testInstance);
+    Assert.assertEquals(monitor.getOnline(), 0L);
+    Assert.assertEquals(monitor.getEnabled(), 0L);
+    Assert.assertEquals(monitor.getTotalMessageReceived(), 0L);
+    Assert.assertEquals(monitor.getDisabledPartitions(), 0L);
+    Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.0d);
+
+    // Update metrics.
+    monitor.updateMaxCapacityUsage(0.5d);
+    monitor.increaseMessageCount(10L);
+    monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), true, true);
+
+    // Verify metrics.
+    Assert.assertEquals(monitor.getTotalMessageReceived(), 10L);
+    Assert.assertEquals(monitor.getSensorName(),
+        "ParticipantStatus.testCluster.DEFAULT|test.testInstance");
+    Assert.assertEquals(monitor.getInstanceName(), testInstance);
+    Assert.assertEquals(monitor.getOnline(), 1L);
+    Assert.assertEquals(monitor.getEnabled(), 1L);
+    Assert.assertEquals(monitor.getDisabledPartitions(), 2L);
+    Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.5d);
+
+    monitor.unregister();
+  }
+}