You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/07/10 18:32:41 UTC

[1/2] helix git commit: Fix resource monitor race condition. [Forced Update!]

Repository: helix
Updated Branches:
  refs/heads/master f9f6cfed3 -> 96eb69186 (forced update)


Fix resource monitor race condition.

The async monitor processing may cause resource mbean deleting failure. This will leave unnecessary mbean in the mbean server.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96eb6918
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96eb6918
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96eb6918

Branch: refs/heads/master
Commit: 96eb69186072b1386eea875ffb0fbdcba6070453
Parents: 5245f3b
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Tue May 29 17:09:28 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jul 10 11:29:49 2018 -0700

----------------------------------------------------------------------
 .../stages/CurrentStateComputationStage.java      | 18 ++----------------
 .../stages/ExternalViewComputeStage.java          |  7 +++++++
 .../monitoring/mbeans/ClusterStatusMonitor.java   | 10 +++++++---
 3 files changed, 16 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 4d9318c..64cd4f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -31,7 +31,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 
 /**
@@ -73,21 +72,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     }
 
     if (!cache.isTaskCache()) {
-      final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
-        @Override
-        public Object call() {
-          for (Resource resource : resourceMap.values()) {
-            int totalPendingMessageCount = 0;
-            for (Partition partition : resource.getPartitions()) {
-              totalPendingMessageCount +=
-                  currentStateOutput.getPendingMessageMap(resource.getResourceName(), partition).size();
-            }
-            clusterStatusMonitor.updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
-          }
-          return null;
-        }
-      });
+      ClusterStatusMonitor clusterStatusMonitor =
+          event.getAttribute(AttributeName.clusterStatusMonitor.name());
       // TODO Update the status async -- jjwang
       updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 591867d..1f455cb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -75,6 +75,8 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
         view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
       }
 
+      int totalPendingMessageCount = 0;
+
       for (Partition partition : resource.getPartitions()) {
         Map<String, String> currentStateMap =
             currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -88,7 +90,10 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
             // }
           }
         }
+        totalPendingMessageCount +=
+            currentStateOutput.getPendingMessageMap(resource.getResourceName(), partition).size();
       }
+
       // Update cluster status monitor mbean
       IdealState idealState = cache.getIdealState(resourceName);
       if (!cache.isTaskCache()) {
@@ -105,6 +110,8 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
               clusterStatusMonitor
                   .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
                       stateModelDef);
+              clusterStatusMonitor
+                  .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
             }
           } else {
             // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true.

http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 133d8ce..5d398eb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -448,10 +448,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   public synchronized void updatePendingMessages(String resourceName, int messageCount) {
-    ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
+    try {
+      ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
-    if (resourceMonitor != null) {
-      resourceMonitor.updatePendingStateTransitionMessages(messageCount);
+      if (resourceMonitor != null) {
+        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
+      }
+    } catch (Exception e) {
+      LOG.error("Fail to update resource pending messages, resource: " + resourceName, e);
     }
   }
 


[2/2] helix git commit: Add new monitor metrics for state transitions.

Posted by jx...@apache.org.
Add new monitor metrics for state transitions.

ClusterStatus: MissingMinActiveReplicaPartitionGauge
ClusterStatus: TotalResourceGauge
ClusterStatus/ResourceStatus: PendingStateTransitionsGauge
ClusterStatus: StateTransitionsCounter


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5245f3b8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5245f3b8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5245f3b8

Branch: refs/heads/master
Commit: 5245f3b877214b9a588808c000a65bb951643408
Parents: 35fcfa0
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri May 25 23:30:11 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jul 10 11:29:49 2018 -0700

----------------------------------------------------------------------
 .../stages/CurrentStateComputationStage.java    |  23 +++-
 .../monitoring/mbeans/ClusterStatusMonitor.java |  42 +++++-
 .../mbeans/ClusterStatusMonitorMBean.java       |  20 +++
 .../monitoring/mbeans/ResourceMonitor.java      |  12 ++
 .../mbeans/TestClusterStatusMonitor.java        | 134 +++++++++++++++++--
 .../monitoring/mbeans/TestResourceMonitor.java  |  18 ++-
 6 files changed, 230 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5245f3b8/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 8d33c7c..4d9318c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -31,6 +31,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+
 
 /**
  * For each LiveInstances select currentState and message whose sessionId matches
@@ -47,7 +49,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-    Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+    final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
     if (cache == null || resourceMap == null) {
       throw new StageException("Missing attributes in event:" + event
@@ -55,7 +57,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     }
 
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    final CurrentStateOutput currentStateOutput = new CurrentStateOutput();
 
     for (LiveInstance instance : liveInstances.values()) {
       String instanceName = instance.getInstanceName();
@@ -71,8 +73,21 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     }
 
     if (!cache.isTaskCache()) {
-      ClusterStatusMonitor clusterStatusMonitor =
-          event.getAttribute(AttributeName.clusterStatusMonitor.name());
+      final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+      asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
+        @Override
+        public Object call() {
+          for (Resource resource : resourceMap.values()) {
+            int totalPendingMessageCount = 0;
+            for (Partition partition : resource.getPartitions()) {
+              totalPendingMessageCount +=
+                  currentStateOutput.getPendingMessageMap(resource.getResourceName(), partition).size();
+            }
+            clusterStatusMonitor.updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
+          }
+          return null;
+        }
+      });
       // TODO Update the status async -- jjwang
       updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/5245f3b8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index b2c5fb0..133d8ce 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -447,6 +447,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
+  public synchronized void updatePendingMessages(String resourceName, int messageCount) {
+    ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
+
+    if (resourceMonitor != null) {
+      resourceMonitor.updatePendingStateTransitionMessages(messageCount);
+    }
+  }
+
   private ResourceMonitor getOrCreateResourceMonitor(String resourceName) {
     try {
       if (!_resourceMbeanMap.containsKey(resourceName)) {
@@ -797,6 +805,11 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   @Override
+  public long getTotalResourceGauge() {
+    return _resourceMbeanMap.size();
+  }
+
+  @Override
   public long getTotalPartitionGauge() {
     long total = 0;
     for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
@@ -824,6 +837,15 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   @Override
+  public long getMissingMinActiveReplicaPartitionGauge() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getMissingMinActiveReplicaPartitionGauge();
+    }
+    return total;
+  }
+
+  @Override
   public long getDifferenceWithIdealStateGauge() {
     long total = 0;
     for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
@@ -831,4 +853,22 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
     return total;
   }
-}
\ No newline at end of file
+
+  @Override
+  public long getStateTransitionCounter() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getTotalMessageReceived();
+    }
+    return total;
+  }
+
+  @Override
+  public long getPendingStateTransitionGuage() {
+    long total = 0;
+    for (Map.Entry<String, ResourceMonitor> entry : _resourceMbeanMap.entrySet()) {
+      total += entry.getValue().getNumPendingStateTransitionGauge();
+    }
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/5245f3b8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 81600cb..9f1d786 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -82,6 +82,11 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
   long getRebalanceFailureCounter();
 
   /**
+   * @return number of all resources in this cluster
+   */
+  long getTotalResourceGauge();
+
+  /**
    * @return number of all partitions in this cluster
    */
   long getTotalPartitionGauge();
@@ -97,7 +102,22 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
   long getMissingTopStatePartitionGauge();
 
   /**
+   * @return number of all partitions in this cluster without enough active replica
+   */
+  long getMissingMinActiveReplicaPartitionGauge();
+
+  /**
    * @return number of all partitions in this cluster whose ExternalView and IdealState have discrepancies
    */
   long getDifferenceWithIdealStateGauge();
+
+  /**
+   * @return number of sent state transition messages in this cluster
+   */
+  long getStateTransitionCounter();
+
+  /**
+   * @return number of pending state transitions in this cluster
+   */
+  long getPendingStateTransitionGuage();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/5245f3b8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index d4b46b1..a103a0c 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -49,6 +49,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
   private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions;
   private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions;
   private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions;
+  private SimpleDynamicMetric<Long> _numPendingStateTransitions;
 
   // Counters
   private SimpleDynamicMetric<Long> _successfulTopStateHandoffDurationCounter;
@@ -86,6 +87,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
     attributeList.add(_partitionTopStateHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
+    attributeList.add(_numPendingStateTransitions);
     doRegister(attributeList, _initObjectName);
     return this;
   }
@@ -115,6 +117,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numOfErrorPartitions = new SimpleDynamicMetric("ErrorPartitionGauge", 0L);
     _numOfPartitionsInExternalView = new SimpleDynamicMetric("ExternalViewPartitionGauge", 0L);
     _numOfPartitions = new SimpleDynamicMetric("PartitionGauge", 0L);
+    _numPendingStateTransitions = new SimpleDynamicMetric("PendingStateTransitionGauge", 0L);
 
     _partitionTopStateHandoffDurationGauge =
         new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
@@ -300,6 +303,11 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numPendingLoadRebalancePartitions.updateValue(0L);
     _numRecoveryRebalanceThrottledPartitions.updateValue(0L);
     _numLoadRebalanceThrottledPartitions.updateValue(0L);
+    _numPendingStateTransitions.updateValue(0L);
+  }
+
+  public void updatePendingStateTransitionMessages(int messageCount) {
+    _numPendingStateTransitions.updateValue((long) messageCount);
   }
 
   public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) {
@@ -361,6 +369,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _numLoadRebalanceThrottledPartitions.getValue();
   }
 
+  public long getNumPendingStateTransitionGauge() {
+    return _numPendingStateTransitions.getValue();
+  }
+
   public void resetMaxTopStateHandoffGauge() {
     if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS <= System.currentTimeMillis()) {
       _maxSinglePartitionTopStateHandoffDuration.updateValue(0L);

http://git-wip-us.apache.org/repos/asf/helix/blob/5245f3b8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index 755997b..fa45c65 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -20,19 +20,18 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import java.lang.management.ManagementFactory;
-import java.util.Date;
-import java.util.Map;
+import java.util.*;
 
 import javax.management.InstanceNotFoundException;
+import javax.management.JMException;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
 
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.*;
+import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -41,6 +40,8 @@ import com.beust.jcommander.internal.Maps;
 
 public class TestClusterStatusMonitor {
   private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
+  String testDB = "TestDB";
+  String testDB_0 = testDB + "_0";
 
   @Test()
   public void testReportData() throws Exception {
@@ -48,8 +49,6 @@ public class TestClusterStatusMonitor {
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
     int n = 5;
-    String testDB = "TestDB";
-    String testDB_0 = testDB + "_0";
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
@@ -161,4 +160,123 @@ public class TestClusterStatusMonitor {
 
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
+
+
+  @Test
+  public void testResourceAggregation() throws JMException {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ClusterStatusMonitor monitor = new ClusterStatusMonitor(clusterName);
+    monitor.active();
+    ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+    try {
+      _server.getMBeanInfo(clusterMonitorObjName);
+    } catch (Exception e) {
+      Assert.fail("Fail to register ClusterStatusMonitor");
+    }
+
+    int numInstance = 5;
+    int numPartition = 10;
+    int numReplica = 3;
+    List<String> instances = new ArrayList<String>();
+    for (int i = 0; i < numInstance; i++) {
+      String instance = "localhost_" + (12918 + i);
+      instances.add(instance);
+    }
+
+    ZNRecord idealStateRecord = DefaultIdealStateCalculator
+        .calculateIdealState(instances, numPartition, numReplica, testDB, "MASTER", "SLAVE");
+    IdealState idealState = new IdealState(TestResourceMonitor.deepCopyZNRecord(idealStateRecord));
+    idealState.setMinActiveReplicas(numReplica);
+    ExternalView externalView = new ExternalView(TestResourceMonitor.deepCopyZNRecord(idealStateRecord));
+    StateModelDefinition stateModelDef =
+        BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition();
+
+    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+
+    Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
+    Assert.assertEquals(monitor.getTotalResourceGauge(), 1);
+    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+    Assert.assertEquals(monitor.getStateTransitionCounter(), 0);
+    Assert.assertEquals(monitor.getPendingStateTransitionGuage(), 0);
+
+
+    int lessMinActiveReplica = 6;
+    Random r = new Random();
+    externalView = new ExternalView(TestResourceMonitor.deepCopyZNRecord(idealStateRecord));
+    int start = r.nextInt(numPartition - lessMinActiveReplica - 1);
+    for (int i = start; i < start + lessMinActiveReplica; i++) {
+      String partition = testDB + "_" + i;
+      Map<String, String> map = externalView.getStateMap(partition);
+      Iterator<String> it = map.keySet().iterator();
+      int flag = 0;
+      while (it.hasNext()) {
+        String key = it.next();
+        if (map.get(key).equalsIgnoreCase("SLAVE")) {
+          if (flag++ % 2 == 0) {
+            map.put(key, "OFFLINE");
+          } else {
+            it.remove();
+          }
+        }
+      }
+      externalView.setStateMap(partition, map);
+    }
+
+    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
+    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), lessMinActiveReplica);
+    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), 0);
+    Assert.assertEquals(monitor.getStateTransitionCounter(), 0);
+    Assert.assertEquals(monitor.getPendingStateTransitionGuage(), 0);
+
+    int missTopState = 7;
+    externalView = new ExternalView(TestResourceMonitor.deepCopyZNRecord(idealStateRecord));
+    start = r.nextInt(numPartition - missTopState - 1);
+    for (int i = start; i < start + missTopState; i++) {
+      String partition = testDB + "_" + i;
+      Map<String, String> map = externalView.getStateMap(partition);
+      int flag = 0;
+      for (String key : map.keySet()) {
+        if (map.get(key).equalsIgnoreCase("MASTER")) {
+          if (flag++ % 2 == 0) {
+            map.put(key, "OFFLINE");
+          } else {
+            map.remove(key);
+          }
+          break;
+        }
+      }
+      externalView.setStateMap(partition, map);
+    }
+
+    monitor.setResourceStatus(externalView, idealState, stateModelDef);
+    Assert.assertEquals(monitor.getTotalPartitionGauge(), numPartition);
+    Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
+    Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
+    Assert.assertEquals(monitor.getStateTransitionCounter(), 0);
+    Assert.assertEquals(monitor.getPendingStateTransitionGuage(), 0);
+
+    int messageCount = 4;
+    List<Message> messages = new ArrayList<>();
+    for (int i = 0; i < messageCount; i++) {
+      Message message = new Message(Message.MessageType.STATE_TRANSITION, "message" + i);
+      message.setResourceName(testDB);
+      message.setTgtName(instances.get(i % instances.size()));
+      messages.add(message);
+    }
+    monitor.increaseMessageReceived(messages);
+    Assert.assertEquals(monitor.getStateTransitionCounter(), messageCount);
+    Assert.assertEquals(monitor.getPendingStateTransitionGuage(), 0);
+
+    // test pending state transition message report and read
+    messageCount = new Random().nextInt(numPartition) + 1;
+    monitor.updatePendingMessages(testDB, messageCount);
+    Assert.assertEquals(monitor.getPendingStateTransitionGuage(), messageCount);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/5245f3b8/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
index 5c52195..5f3d504 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestResourceMonitor.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 import javax.management.JMException;
-import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -44,12 +43,13 @@ public class TestResourceMonitor {
   int _replicas = 3;
   int _partitions = 50;
 
-  @Test() public void testReportData() throws JMException {
+  @Test()
+  public void testReportData() throws JMException {
     final int n = 5;
     ResourceMonitor monitor = new ResourceMonitor(_clusterName, _dbName, new ObjectName("testDomain:key=value"));
     monitor.register();
 
-    List<String> instances = new ArrayList<String>();
+    List<String> instances = new ArrayList<>();
     for (int i = 0; i < n; i++) {
       String instance = "localhost_" + (12918 + i);
       instances.add(instance);
@@ -191,6 +191,12 @@ public class TestResourceMonitor {
     Assert.assertEquals(monitor.getMissingMinActiveReplicaPartitionGauge(), 0);
     Assert.assertEquals(monitor.getMissingReplicaPartitionGauge(), missTopState);
     Assert.assertEquals(monitor.getMissingTopStatePartitionGauge(), missTopState);
+
+    Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), 0);
+    // test pending state transition message report and read
+    int messageCount = new Random().nextInt(_partitions) + 1;
+    monitor.updatePendingStateTransitionMessages(messageCount);
+    Assert.assertEquals(monitor.getNumPendingStateTransitionGauge(), messageCount);
   }
 
   /**
@@ -198,17 +204,17 @@ public class TestResourceMonitor {
    *
    * @return
    */
-  public ZNRecord deepCopyZNRecord(ZNRecord record) {
+  public static ZNRecord deepCopyZNRecord(ZNRecord record) {
     ZNRecord copy = new ZNRecord(record.getId());
 
     copy.getSimpleFields().putAll(record.getSimpleFields());
     for (String mapKey : record.getMapFields().keySet()) {
       Map<String, String> mapField = record.getMapFields().get(mapKey);
-      copy.getMapFields().put(mapKey, new TreeMap<String, String>(mapField));
+      copy.getMapFields().put(mapKey, new TreeMap<>(mapField));
     }
 
     for (String listKey : record.getListFields().keySet()) {
-      copy.getListFields().put(listKey, new ArrayList<String>(record.getListFields().get(listKey)));
+      copy.getListFields().put(listKey, new ArrayList<>(record.getListFields().get(listKey)));
     }
     if (record.getRawPayload() != null) {
       byte[] rawPayload = new byte[record.getRawPayload().length];