You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2020/07/21 16:50:59 UTC

[helix] branch master updated: Add Cluster and instance level metrics to report the number of messages that have not been completed after their expected completion time

This is an automated email from the ASF dual-hosted git repository.

lxia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b29a6e  Add Cluster and instance level metrics to report the number of messages that have not been completed after their expected completion time
     new dc2e8bb  Merge pull request #1111 from lei-xia/master
9b29a6e is described below

commit 9b29a6e2f13344fa43f81d2bb21845cb65a83c7d
Author: Lei Xia <lx...@linkedin.com>
AuthorDate: Sat Jun 20 11:04:26 2020 -0700

    Add Cluster and instance level metrics to report the number of messages that have not been completed after their expected completion time
    
    Changes in this commit:
    1) Add expected completion time field to Message, set the default expected completion period to be 1 day (can be configurable via system property).
    2) Add Past-Due incomplete message gauge in both cluster and instance monitor.
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |   4 +
 .../helix/controller/GenericHelixController.java   |   4 -
 .../controller/stages/ReadClusterDataStage.java    |   8 +-
 .../main/java/org/apache/helix/model/Message.java  |  42 ++++++--
 .../monitoring/mbeans/ClusterStatusMonitor.java    |  88 ++++++++++------
 .../mbeans/ClusterStatusMonitorMBean.java          |   7 ++
 .../helix/monitoring/mbeans/InstanceMonitor.java   |  36 ++++++-
 .../mbeans/TestClusterStatusMonitor.java           | 111 ++++++++++++++++++++-
 .../monitoring/mbeans/TestInstanceMonitor.java     |   4 +
 9 files changed, 262 insertions(+), 42 deletions(-)

diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index c71d5c6..e9d8fcb 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -69,6 +69,10 @@ public class SystemPropertyKeys {
   // Controller
   public static final String CONTROLLER_MESSAGE_PURGE_DELAY = "helix.controller.stages.MessageGenerationPhase.messagePurgeDelay";
 
+  // Message
+  public static final String MESSAGE_EXPECTED_COMPLETION_PERIOD = "helix.controller.message.ExpectMessageCompletionPeriod";
+
+
   // MBean monitor for helix.
   public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = "helix.monitor.slidingTimeWindow.ms";
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 683845c..238242f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -942,10 +942,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     pushToEventQueues(ClusterEventType.MessageChange, changeContext,
         Collections.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
 
-    if (_isMonitoring && messages != null) {
-      _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
-    }
-
     logger.info("END: GenericClusterController.onMessage() for cluster " + _clusterName);
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index d284c8e..535fadf 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -19,6 +19,8 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,6 +39,7 @@ import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +80,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
             Map<String, List<String>> oldDisabledPartitions = Maps.newHashMap();
             Map<String, Set<String>> tags = Maps.newHashMap();
             Map<String, LiveInstance> liveInstanceMap = dataProvider.getLiveInstances();
+            Map<String, Set<Message>> instanceMessageMap = Maps.newHashMap();
             for (Map.Entry<String, InstanceConfig> e : dataProvider.getInstanceConfigMap()
                 .entrySet()) {
               String instanceName = e.getKey();
@@ -84,6 +88,8 @@ public class ReadClusterDataStage extends AbstractBaseStage {
               instanceSet.add(instanceName);
               if (liveInstanceMap.containsKey(instanceName)) {
                 liveInstanceSet.add(instanceName);
+                instanceMessageMap.put(instanceName,
+                    Sets.newHashSet(dataProvider.getMessages(instanceName).values()));
               }
               if (!config.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
                   && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
@@ -99,7 +105,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
             }
             clusterStatusMonitor
                 .setClusterInstanceStatus(liveInstanceSet, instanceSet, disabledInstanceSet,
-                    disabledPartitions, oldDisabledPartitions, tags);
+                    disabledPartitions, oldDisabledPartitions, tags, instanceMessageMap);
             LogUtil.logDebug(logger, _eventId, "Complete cluster status monitors update.");
           }
           return null;
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index eefecec..7d9e047 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 import org.apache.helix.HelixException;
@@ -36,6 +37,8 @@ import org.apache.helix.HelixProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
@@ -79,6 +82,7 @@ public class Message extends HelixProperty {
     TO_STATE,
     STATE_MODEL_DEF,
     CREATE_TIMESTAMP,
+    COMPLETION_DUE_TIMESTAMP,
     READ_TIMESTAMP,
     EXECUTE_START_TIMESTAMP,
     MSG_TYPE,
@@ -117,6 +121,11 @@ public class Message extends HelixProperty {
   // Currently, the field is only used for invalidating messages in controller's message cache.
   private boolean _expired = false;
 
+  // The expect period of time (in ms) that a message should be completed, default 1 day
+  public static final long MESSAGE_EXPECT_COMPLETION_PERIOD = HelixUtil
+      .getSystemPropertyAsLong(SystemPropertyKeys.MESSAGE_EXPECTED_COMPLETION_PERIOD,
+          TimeUnit.DAYS.toMillis(1));
+
   /**
    * Compares the creation time of two Messages
    */
@@ -167,13 +176,21 @@ public class Message extends HelixProperty {
 
   /**
    * Set the time that the message was created
-   * @param timestamp a UNIX timestamp
+   * @param timestamp a UNIX timestamp (in ms)
    */
   public void setCreateTimeStamp(long timestamp) {
     _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), timestamp);
   }
 
   /**
+   * Set the time that the message was expected to be completed
+   * @param timestamp a UNIX timestamp (in ms)
+   */
+  public void setCompletionDueTimeStamp(long timestamp) {
+    _record.setLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), timestamp);
+  }
+
+  /**
    * Instantiate a message with a new id
    * @param record a ZNRecord corresponding to a message
    * @param id unique message identifier
@@ -490,7 +507,7 @@ public class Message extends HelixProperty {
 
   /**
    * Set the time that this message was read
-   * @param time UNIX timestamp
+   * @param time UNIX timestamp (in ms)
    */
   public void setReadTimeStamp(long time) {
     _record.setLongField(Attributes.READ_TIMESTAMP.toString(), time);
@@ -498,7 +515,7 @@ public class Message extends HelixProperty {
 
   /**
    * Set the time that the instance executes tasks as instructed by this message
-   * @param time UNIX timestamp
+   * @param time UNIX timestamp (in ms)
    */
   public void setExecuteStartTimeStamp(long time) {
     _record.setLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), time);
@@ -506,7 +523,7 @@ public class Message extends HelixProperty {
 
   /**
    * Get the time that this message was read
-   * @return UNIX timestamp
+   * @return UNIX timestamp (in ms)
    */
   public long getReadTimeStamp() {
     return _record.getLongField(Attributes.READ_TIMESTAMP.toString(), 0L);
@@ -514,7 +531,7 @@ public class Message extends HelixProperty {
 
   /**
    * Get the time that execution occurred as a result of this message
-   * @return UNIX timestamp
+   * @return UNIX timestamp (in ms)
    */
   public long getExecuteStartTimeStamp() {
     return _record.getLongField(Attributes.EXECUTE_START_TIMESTAMP.toString(), 0L);
@@ -522,13 +539,26 @@ public class Message extends HelixProperty {
 
   /**
    * Get the time that this message was created
-   * @return UNIX timestamp
+   * @return UNIX timestamp (in ms)
    */
   public long getCreateTimeStamp() {
     return _record.getLongField(Attributes.CREATE_TIMESTAMP.toString(), 0L);
   }
 
   /**
+   * Get the time that the message was expected to be completed
+   * @return UNIX timestamp (in ms)
+   */
+  public long getCompletionDueTimeStamp() {
+    long completionDue = _record.getLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), 0L);
+    if (completionDue == 0) {
+      completionDue = getCreateTimeStamp() + MESSAGE_EXPECT_COMPLETION_PERIOD;
+    }
+
+    return completionDue;
+  }
+
+  /**
    * Set a unique identifier that others can use to refer to this message in replies
    * @param correlationId a unique identifier, usually randomly generated
    */
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index fc0b19d..39ef1e1 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -37,7 +37,6 @@ import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
@@ -82,7 +81,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private Set<String> _disabledInstances = Collections.emptySet();
   private Map<String, Map<String, List<String>>> _disabledPartitions = Collections.emptyMap();
   private Map<String, List<String>> _oldDisabledPartitions = Collections.emptyMap();
-  private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
+  private AtomicLong _totalMsgQueueSize = new AtomicLong(0L);
+  private AtomicLong _maxInstanceMsgQueueSize = new AtomicLong(0L);
+  private AtomicLong _totalPastDueMsgSize = new AtomicLong(0L);
   private boolean _rebalanceFailure = false;
   private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
 
@@ -175,22 +176,17 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
 
   @Override
   public long getMaxMessageQueueSizeGauge() {
-    long maxQueueSize = 0;
-    for (Long queueSize : _instanceMsgQueueSizes.values()) {
-      if (queueSize > maxQueueSize) {
-        maxQueueSize = queueSize;
-      }
-    }
-    return maxQueueSize;
+    return _maxInstanceMsgQueueSize.get();
   }
 
   @Override
   public long getInstanceMessageQueueBacklog() {
-    long sum = 0;
-    for (Long queueSize : _instanceMsgQueueSizes.values()) {
-      sum += queueSize;
-    }
-    return sum;
+    return _totalMsgQueueSize.get();
+  }
+
+  @Override
+  public long getTotalPastDueMessageGauge() {
+    return _totalPastDueMsgSize.get();
   }
 
   private void register(Object bean, ObjectName name) {
@@ -228,10 +224,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    * @param disabledInstanceSet the current set of configured instances that are disabled
    * @param disabledPartitions a map of instance name to the set of partitions disabled on it
    * @param tags a map of instance name to the set of tags on it
+   * @param instanceMessageMap a map of pending messages from each live instance
    */
   public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> instanceSet,
       Set<String> disabledInstanceSet, Map<String, Map<String, List<String>>> disabledPartitions,
-      Map<String, List<String>> oldDisabledPartitions, Map<String, Set<String>> tags) {
+      Map<String, List<String>> oldDisabledPartitions, Map<String, Set<String>> tags,
+      Map<String, Set<Message>> instanceMessageMap) {
     synchronized (_instanceMonitorMap) {
       // Unregister beans for instances that are no longer configured
       Set<String> toUnregister = Sets.newHashSet(_instanceMonitorMap.keySet());
@@ -254,6 +252,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
           LOG.error("Failed to create instance monitor for instance: {}.", instanceName);
         }
       }
+
       try {
         registerInstances(monitorsToRegister);
       } catch (JMException e) {
@@ -267,6 +266,12 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       _disabledPartitions = disabledPartitions;
       _oldDisabledPartitions = oldDisabledPartitions;
 
+      // message related counts
+      long totalMsgQueueSize = 0L;
+      long maxInstanceMsgQueueSize = 0L;
+      long totalPastDueMsgSize = 0L;
+      long now = System.currentTimeMillis();
+
       // Update the instance MBeans
       for (String instanceName : instanceSet) {
         if (_instanceMonitorMap.containsKey(instanceName)) {
@@ -277,6 +282,24 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
               oldDisabledPartitions.get(instanceName), liveInstanceSet.contains(instanceName),
               !disabledInstanceSet.contains(instanceName));
 
+          // calculate and update instance level message related gauges
+          Set<Message> messages = instanceMessageMap.get(instanceName);
+          if (messages != null) {
+            long msgQueueSize = messages.size();
+            bean.updateMessageQueueSize(msgQueueSize);
+            totalMsgQueueSize += msgQueueSize;
+            if (msgQueueSize > maxInstanceMsgQueueSize) {
+              maxInstanceMsgQueueSize = msgQueueSize;
+            }
+
+            long pastDueMsgCount =
+                messages.stream().filter(m -> (m.getCompletionDueTimeStamp() <= now)).count();
+            bean.updatePastDueMessageGauge(pastDueMsgCount);
+            totalPastDueMsgSize += pastDueMsgCount;
+            LOG.debug("There are totally {} messages, {} are past due on instance {}", msgQueueSize,
+                pastDueMsgCount, instanceName);
+          }
+
           // If the sensor name changed, re-register the bean so that listeners won't miss it
           String newSensorName = bean.getSensorName();
           if (!oldSensorName.equals(newSensorName)) {
@@ -289,6 +312,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
           }
         }
       }
+
+      // Update cluster level message related gauges
+      _maxInstanceMsgQueueSize.set(maxInstanceMsgQueueSize);
+      _totalMsgQueueSize.set(totalMsgQueueSize);
+      _totalPastDueMsgSize.set(totalPastDueMsgSize);
     }
   }
 
@@ -324,7 +352,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   /**
-   * Update message count per instance and per resource
+   * Update the total count of messages that the controller has sent to each instance and each resource so far
    * @param messages a list of messages
    */
   public void increaseMessageReceived(List<Message> messages) {
@@ -351,7 +379,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       }
     }
 
-    // Update message count per instance and per resource
+    // Update message count sent per instance and per resource
     for (String instance : messageCountPerInstance.keySet()) {
       InstanceMonitor instanceMonitor = _instanceMonitorMap.get(instance);
       if (instanceMonitor != null) {
@@ -563,10 +591,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public void addMessageQueueSize(String instanceName, long msgQueueSize) {
-    _instanceMsgQueueSizes.put(instanceName, msgQueueSize);
-  }
-
   public void active() {
     LOG.info("Active ClusterStatusMonitor");
     try {
@@ -580,7 +604,6 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     LOG.info("Reset ClusterStatusMonitor");
     try {
       unregisterAllResources();
-      _instanceMsgQueueSizes.clear();
       unregisterAllInstances();
       unregisterAllPerInstanceResources();
       unregister(getObjectName(clusterBeanName()));
@@ -588,7 +611,16 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
       unregisterAllWorkflowsMonitor();
       unregisterAllJobs();
 
+      _liveInstances.clear();
+      _instances.clear();
+      _disabledInstances.clear();
+      _disabledPartitions.clear();
+      _oldDisabledPartitions.clear();
       _rebalanceFailure = false;
+      _maxInstanceMsgQueueSize.set(0L);
+      _totalPastDueMsgSize.set(0L);
+      _totalMsgQueueSize.set(0L);
+      _rebalanceFailureCount.set(0L);
     } catch (Exception e) {
       LOG.error("Fail to reset ClusterStatusMonitor, cluster: " + _clusterName, e);
     }
@@ -872,7 +904,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     return _resourceMonitorMap.get(resourceName);
   }
 
-  public String clusterBeanName() {
+  protected String clusterBeanName() {
     return String.format("%s=%s", CLUSTER_DN_KEY, _clusterName);
   }
 
@@ -881,7 +913,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    * @param instanceName
    * @return instance bean name
    */
-  private String getInstanceBeanName(String instanceName) {
+  protected String getInstanceBeanName(String instanceName) {
     return String.format("%s,%s=%s", clusterBeanName(), INSTANCE_DN_KEY, instanceName);
   }
 
@@ -890,7 +922,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    * @param resourceName
    * @return resource bean name
    */
-  private String getResourceBeanName(String resourceName) {
+  protected String getResourceBeanName(String resourceName) {
     return String.format("%s,%s=%s", clusterBeanName(), RESOURCE_DN_KEY, resourceName);
   }
 
@@ -901,7 +933,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    * @param resourceName
    * @return per-instance resource bean name
    */
-  public String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
+  protected String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
     return String.format("%s,%s", clusterBeanName(),
         new PerInstanceResourceMonitor.BeanName(instanceName, resourceName).toString());
   }
@@ -912,7 +944,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    * @param workflowType The workflow type
    * @return per workflow type bean name
    */
-  public String getWorkflowBeanName(String workflowType) {
+  protected String getWorkflowBeanName(String workflowType) {
     return String.format("%s, %s=%s", clusterBeanName(), WORKFLOW_TYPE_DN_KEY, workflowType);
   }
 
@@ -922,7 +954,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
    * @param jobType The job type
    * @return per job type bean name
    */
-  public String getJobBeanName(String jobType) {
+  protected String getJobBeanName(String jobType) {
     return String.format("%s, %s=%s", clusterBeanName(), JOB_TYPE_DN_KEY, jobType);
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 65f5a4e..ed94763 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -61,6 +61,13 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
   long getInstanceMessageQueueBacklog();
 
   /**
+   * Total count of all messages that have not been completed
+   * after their expected completion time for instances in this cluster
+   * @return
+   */
+  long getTotalPastDueMessageGauge();
+
+  /**
    * @return 1 if cluster is enabled, otherwise 0
    */
   long getEnabled();
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index e0c0f89..86151e7 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -48,7 +48,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     ENABLED_STATUS_GAUGE("Enabled"),
     ONLINE_STATUS_GAUGE("Online"),
     DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
-    MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
+    MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge"),
+    MESSAGE_QUEUE_SIZE_GAUGE("MessageQueueSizeGauge"),
+    PASTDUE_MESSAGE_GAUGE("PastDueMessageGauge");
 
     private final String metricName;
 
@@ -75,6 +77,8 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
   private SimpleDynamicMetric<Long> _onlineStatusGauge;
   private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
+  private SimpleDynamicMetric<Long> _messageQueueSizeGauge;
+  private SimpleDynamicMetric<Long> _pastDueMessageGauge;
 
   // A map of dynamic capacity Gauges. The map's keys could change.
   private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;
@@ -108,6 +112,12 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     _maxCapacityUsageGauge =
         new SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
             0.0d);
+    _messageQueueSizeGauge =
+        new SimpleDynamicMetric<>(InstanceMonitorMetric.MESSAGE_QUEUE_SIZE_GAUGE.metricName(),
+            0L);
+    _pastDueMessageGauge =
+        new SimpleDynamicMetric<>(InstanceMonitorMetric.PASTDUE_MESSAGE_GAUGE.metricName(),
+            0L);
   }
 
   private List<DynamicMetric<?, ?>> buildAttributeList() {
@@ -116,7 +126,9 @@ public class InstanceMonitor extends DynamicMBeanProvider {
         _disabledPartitionsGauge,
         _enabledStatusGauge,
         _onlineStatusGauge,
-        _maxCapacityUsageGauge
+        _maxCapacityUsageGauge,
+        _messageQueueSizeGauge,
+        _pastDueMessageGauge
     );
 
     attributeList.addAll(_dynamicCapacityMetricsMap.values());
@@ -146,6 +158,10 @@ public class InstanceMonitor extends DynamicMBeanProvider {
     return _disabledPartitionsGauge.getValue();
   }
 
+  protected long getMessageQueueSizeGauge() { return _messageQueueSizeGauge.getValue(); }
+
+  protected long getPastDueMessageGauge() { return _pastDueMessageGauge.getValue(); }
+
   /**
    * Get the name of the monitored instance
    * @return instance name as a string
@@ -210,6 +226,22 @@ public class InstanceMonitor extends DynamicMBeanProvider {
   }
 
   /**
+   * Updates message queue size for this instance.
+   * @param queueSize message queue size of this instance
+   */
+  public synchronized void updateMessageQueueSize(long queueSize) {
+    _messageQueueSizeGauge.updateValue(queueSize);
+  }
+
+  /**
+   * Updates number of messages that has not been completed after its expected completion time for this instance.
+   * @param msgCount count of messages that has not been completed after its due completion time
+   */
+  public synchronized void updatePastDueMessageGauge(long msgCount) {
+    _pastDueMessageGauge.updateValue(msgCount);
+  }
+
+  /**
    * Gets max capacity usage of this instance.
    * @return Max capacity usage of this instance.
    */
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index dee1f86..789992f 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -29,6 +29,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 import javax.management.AttributeNotFoundException;
 import javax.management.InstanceNotFoundException;
 import javax.management.JMException;
@@ -56,6 +58,8 @@ import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
 
 public class TestClusterStatusMonitor {
   private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
@@ -164,6 +168,84 @@ public class TestClusterStatusMonitor {
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  @Test()
+  public void testMessageMetrics() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 5;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    monitor.active();
+    ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+
+    Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
+
+    Map<String, Set<Message>> instanceMessageMap = Maps.newHashMap();
+    Set<String> liveInstanceSet = Sets.newHashSet();
+    for (int i = 0; i < n; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      liveInstanceSet.add(instanceName);
+
+      long now = System.currentTimeMillis();
+      Set<Message> messages = Sets.newHashSet();
+      // add 10 regular messages to each instance
+      for (int j = 0; j < 10; j++) {
+        Message m = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+        m.setTgtName(instanceName);
+        messages.add(m);
+      }
+
+      // add 10 past-due messages to each instance (using default completion period)
+      for (int j = 0; j < 10; j++) {
+        Message m = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+        m.setTgtName(instanceName);
+        m.setCreateTimeStamp(now - Message.MESSAGE_EXPECT_COMPLETION_PERIOD - 1000);
+        messages.add(m);
+      }
+
+      // add other 5 past-due messages to each instance (using explicitly set COMPLETION time in message)
+      for (int j = 0; j < 5; j++) {
+        Message m = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
+        m.setTgtName(instanceName);
+        m.setCompletionDueTimeStamp(now - 1000);
+        messages.add(m);
+      }
+      instanceMessageMap.put(instanceName, messages);
+    }
+
+    monitor.setClusterInstanceStatus(liveInstanceSet, liveInstanceSet, Collections.emptySet(),
+        Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), instanceMessageMap);
+
+    Assert.assertEquals(monitor.getInstanceMessageQueueBacklog(), 25 * n);
+    Assert.assertEquals(monitor.getTotalPastDueMessageGauge(), 15 * n);
+
+    Object totalMsgSize =
+        _server.getAttribute(clusterMonitorObjName, "InstanceMessageQueueBacklog");
+    Assert.assertTrue(totalMsgSize instanceof Long);
+    Assert.assertEquals((long) totalMsgSize, 25 * n);
+
+    Object totalPastdueMsgCount =
+        _server.getAttribute(clusterMonitorObjName, "TotalPastDueMessageGauge");
+    Assert.assertTrue(totalPastdueMsgCount instanceof Long);
+    Assert.assertEquals((long) totalPastdueMsgCount, 15 * n);
+
+    for (String instance : liveInstanceSet) {
+      ObjectName objName =
+          monitor.getObjectName(monitor.getInstanceBeanName(instance));
+      Object messageSize = _server.getAttribute(objName, "MessageQueueSizeGauge");
+      Assert.assertTrue(messageSize instanceof Long);
+      Assert.assertEquals((long) messageSize, 25L);
+
+      Object pastdueMsgCount = _server.getAttribute(objName, "PastDueMessageGauge");
+      Assert.assertTrue(pastdueMsgCount instanceof Long);
+      Assert.assertEquals((long) pastdueMsgCount, 15L);
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
 
   @Test
   public void testResourceAggregation() throws JMException, IOException {
@@ -352,7 +434,7 @@ public class TestClusterStatusMonitor {
     // Call setClusterInstanceStatus to register instance monitors.
     monitor.setClusterInstanceStatus(maxUsageMap.keySet(), maxUsageMap.keySet(),
         Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
-        Collections.emptyMap());
+        Collections.emptyMap(), Collections.emptyMap());
 
     // Update instance capacity status.
     for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
@@ -435,4 +517,31 @@ public class TestClusterStatusMonitor {
       }
     }
   }
+
+  private void verifyMessageMetrics(ClusterStatusMonitor monitor, Map<String, Double> maxUsageMap,
+      Map<String, Map<String, Integer>> instanceCapacityMap)
+      throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,
+             ReflectionException, InstanceNotFoundException {
+    // Verify results.
+    for (Map.Entry<String, Map<String, Integer>> instanceEntry : instanceCapacityMap.entrySet()) {
+      String instance = instanceEntry.getKey();
+      Map<String, Integer> capacityMap = instanceEntry.getValue();
+      String instanceBeanName = String
+          .format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY,
+              instance);
+      ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+      Assert.assertTrue(_server.isRegistered(instanceObjectName));
+      Assert.assertEquals(_server.getAttribute(instanceObjectName,
+          InstanceMonitor.InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName()),
+          maxUsageMap.get(instance));
+
+      for (Map.Entry<String, Integer> capacityEntry : capacityMap.entrySet()) {
+        String capacityKey = capacityEntry.getKey();
+        String attributeName = capacityKey + "Gauge";
+        Assert.assertEquals((long) _server.getAttribute(instanceObjectName, attributeName),
+            (long) instanceCapacityMap.get(instance).get(capacityKey));
+      }
+    }
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
index 609581b..7cac92e 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestInstanceMonitor.java
@@ -59,6 +59,8 @@ public class TestInstanceMonitor {
     monitor.updateMaxCapacityUsage(0.5d);
     monitor.increaseMessageCount(10L);
     monitor.updateInstance(tags, disabledPartitions, Collections.emptyList(), true, true);
+    monitor.updateMessageQueueSize(100L);
+    monitor.updatePastDueMessageGauge(50L);
 
     // Verify metrics.
     Assert.assertEquals(monitor.getTotalMessageReceived(), 10L);
@@ -69,6 +71,8 @@ public class TestInstanceMonitor {
     Assert.assertEquals(monitor.getEnabled(), 1L);
     Assert.assertEquals(monitor.getDisabledPartitions(), 2L);
     Assert.assertEquals(monitor.getMaxCapacityUsageGauge(), 0.5d);
+    Assert.assertEquals(monitor.getMessageQueueSizeGauge(), 100L);
+    Assert.assertEquals(monitor.getPastDueMessageGauge(), 50L);
 
     monitor.unregister();
   }