You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/27 00:46:47 UTC

helix git commit: Fix topstate handoff metrics.

Repository: helix
Updated Branches:
  refs/heads/master 266b8bb1a -> 7753b602c


Fix topstate handoff metrics.

We've confirmed a bug in the logic that calculates topstate handoff duration.
With this issue, if the previous master instance is offline, an older handoff start time could be used to calculate the duration.
This results in huge handoff duration in the Helix metrics.
This change will fix this bug. If the previous node that holds topstate replica goes to offline, the offline time will be used as the start time.

RB=1295351
G=helix-reviewers
A=lxia,hrzhang


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7753b602
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7753b602
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7753b602

Branch: refs/heads/master
Commit: 7753b602cee6d08a8326a68f899cb089378aae9f
Parents: 266b8bb
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Apr 27 10:56:43 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jun 26 16:34:28 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     |   6 +
 .../stages/CurrentStateComputationStage.java    | 158 +++++---
 .../mbeans/TestTopStateHandoffMetrics.java      | 219 +++++++++--
 .../resources/TestTopStateHandoffMetrics.json   | 370 +++++++++++--------
 4 files changed, 529 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7753b602/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 386150d..5f87317 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -85,6 +85,7 @@ public class ClusterDataCache {
   private Map<String, ClusterConstraints> _constraintMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
+  private Map<String, Map<String, String>> _lastTopStateLocationMap = new HashMap<>();
   private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
   private Map<String, ExternalView> _externalViewMap = new HashMap<>();
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
@@ -632,6 +633,10 @@ public class ClusterDataCache {
     return _missingTopStateMap;
   }
 
+  public Map<String, Map<String, String>> getLastTopStateLocationMap() {
+    return _lastTopStateLocationMap;
+  }
+
   public Integer getParticipantActiveTaskCount(String instance) {
     return _participantActiveTaskCount.get(instance);
   }
@@ -900,6 +905,7 @@ public class ClusterDataCache {
 
   public void clearMonitoringRecords() {
     _missingTopStateMap.clear();
+    _lastTopStateLocationMap.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/7753b602/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index edf2c47..8d33c7c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -19,23 +19,19 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
+import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * For each LiveInstances select currentState and message whose sessionId matches
  * sessionId from LiveInstance Get Partition,State for all the resources computed in
@@ -77,7 +73,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     if (!cache.isTaskCache()) {
       ClusterStatusMonitor clusterStatusMonitor =
           event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      updateMissingTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput);
+      // TODO Update the status async -- jjwang
+      updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput);
     }
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
   }
@@ -174,15 +171,22 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       currentStateOutput.setCancellationState(resourceName, partition, instanceName, message);
     }
   }
-  private void updateMissingTopStateStatus(ClusterDataCache cache,
+
+  private void updateTopStateStatus(ClusterDataCache cache,
       ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
       CurrentStateOutput currentStateOutput) {
     Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+
     long durationThreshold = Long.MAX_VALUE;
     if (cache.getClusterConfig() != null) {
       durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
     }
 
+    // Remove any resource records that no longer exists
+    missingTopStateMap.keySet().retainAll(resourceMap.keySet());
+    lastTopStateMap.keySet().retainAll(resourceMap.keySet());
+
     for (Resource resource : resourceMap.values()) {
       StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
       if (stateModelDef == null || resource.getStateModelDefRef()
@@ -191,26 +195,33 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
         continue;
       }
 
+      String resourceName = resource.getResourceName();
+
       for (Partition partition : resource.getPartitions()) {
         Map<String, String> stateMap =
-            currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
-
-        // TODO: improve following with MIN_ACTIVE_TOP_STATE logic
-        // Missing top state need to record
-        if (!stateMap.values().contains(stateModelDef.getTopState()) && (!missingTopStateMap
-            .containsKey(resource.getResourceName()) || !missingTopStateMap
-            .get(resource.getResourceName()).containsKey(partition.getPartitionName()))) {
-          reportNewTopStateMissing(cache, stateMap, missingTopStateMap, resource, partition,
-              stateModelDef.getTopState());
+            currentStateOutput.getCurrentStateMap(resourceName, partition);
+
+        for (String instance : stateMap.keySet()) {
+          if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
+            if (!lastTopStateMap.containsKey(resourceName)) {
+              lastTopStateMap.put(resourceName, new HashMap<String, String>());
+            }
+            // recording any top state, it is enough for tracking the top state changes.
+            lastTopStateMap.get(resourceName).put(partition.getPartitionName(), instance);
+            break;
+          }
         }
 
-        // Top state comes back
-        // The first time participant started or controller switched will be ignored
-        if (missingTopStateMap.containsKey(resource.getResourceName()) && missingTopStateMap
-            .get(resource.getResourceName()).containsKey(partition.getPartitionName()) && stateMap
-            .values().contains(stateModelDef.getTopState())) {
-          reportTopStateComesBack(cache, stateMap, missingTopStateMap, resource, partition,
+        if (stateMap.values().contains(stateModelDef.getTopState())) {
+          // Top state comes back
+          // The first time participant started or controller switched will be ignored
+          reportTopStateComesBack(cache, stateMap, missingTopStateMap, resourceName, partition,
               clusterStatusMonitor, durationThreshold, stateModelDef.getTopState());
+        } else {
+          // TODO: improve following with MIN_ACTIVE_TOP_STATE logic
+          // Missing top state need to record
+          reportNewTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
+              partition, stateModelDef.getTopState(), currentStateOutput);
         }
       }
     }
@@ -232,19 +243,32 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     }
   }
 
-  private void reportNewTopStateMissing(ClusterDataCache cache, Map<String, String> stateMap,
-      Map<String, Map<String, Long>> missingTopStateMap, Resource resource, Partition partition,
-      String topState) {
+  private void reportNewTopStateMissing(ClusterDataCache cache,
+      Map<String, Map<String, Long>> missingTopStateMap,
+      Map<String, Map<String, String>> lastTopStateMap, String resourceName, Partition partition,
+      String topState, CurrentStateOutput currentStateOutput) {
+    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName())) {
+      // a previous missing has been already recorded
+      return;
+    }
 
     long startTime = NOT_RECORDED;
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    for (String instanceName : stateMap.keySet()) {
-      if (liveInstances.containsKey(instanceName)) {
-        CurrentState currentState =
-            cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
-                .get(resource.getResourceName());
 
-        if (currentState.getPreviousState(partition.getPartitionName()) != null && currentState
+    // 1. try to find the previous topstate missing event for the startTime.
+    String missingStateInstance = null;
+    if (lastTopStateMap.containsKey(resourceName)) {
+      missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
+    }
+
+    if (missingStateInstance != null) {
+      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+      if (liveInstances.containsKey(missingStateInstance)) {
+        CurrentState currentState = cache.getCurrentState(missingStateInstance,
+            liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
+
+        if (currentState != null
+            && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
             .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
           // Update the latest start time only from top state to other state transition
           // At beginning, the start time should -1 (not recorded). If something happen either
@@ -254,32 +278,55 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           // Previous state is top state does not mean that resource has only one top state
           // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
           startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
+        } // Else no related state transition history found, use current time as the missing start time.
+      } else {
+        // If the previous topState holder is no longer alive, the offline time is used as start time.
+        Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
+        if (offlineMap.containsKey(missingStateInstance)) {
+          startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
         }
       }
     }
 
+    // 2. if no previous topstate records, use any pending message that are created for topstate transition
     if (startTime == NOT_RECORDED) {
+      for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
+          .values()) {
+        // Only messages that match the current session ID will be recorded in the map.
+        // So no need to redundantly check here.
+        if (message.getToState().equals(topState)) {
+          startTime = Math.max(startTime, message.getCreateTimeStamp());
+        }
+      }
+    }
+
+    // 3. if no clue about previous topstate or any related pending message, use the current system time.
+    if (startTime == NOT_RECORDED) {
+      LOG.warn("Cannot confirm top state missing start time. Use the current system time as the start time.");
       startTime = System.currentTimeMillis();
     }
 
-    if (!missingTopStateMap.containsKey(resource.getResourceName())) {
-      missingTopStateMap.put(resource.getResourceName(), new HashMap<String, Long>());
+    if (!missingTopStateMap.containsKey(resourceName)) {
+      missingTopStateMap.put(resourceName, new HashMap<String, Long>());
     }
 
-    Map<String, Long> partitionMap = missingTopStateMap.get(resource.getResourceName());
+    Map<String, Long> partitionMap = missingTopStateMap.get(resourceName);
     // Update the new partition without top state
     if (!partitionMap.containsKey(partition.getPartitionName())) {
-      missingTopStateMap.get(resource.getResourceName())
-          .put(partition.getPartitionName(), startTime);
+      partitionMap.put(partition.getPartitionName(), startTime);
     }
   }
 
   private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap,
-      Map<String, Map<String, Long>> missingTopStateMap, Resource resource, Partition partition,
+      Map<String, Map<String, Long>> missingTopStateMap, String resourceName, Partition partition,
       ClusterStatusMonitor clusterStatusMonitor, long threshold, String topState) {
+    if (!missingTopStateMap.containsKey(resourceName) || !missingTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName())) {
+      // there is no previous missing recorded
+      return;
+    }
 
-    long handOffStartTime =
-        missingTopStateMap.get(resource.getResourceName()).get(partition.getPartitionName());
+    long handOffStartTime = missingTopStateMap.get(resourceName).get(partition.getPartitionName());
 
     // Find the earliest end time from the top states
     long handOffEndTime = System.currentTimeMillis();
@@ -287,32 +334,33 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     for (String instanceName : stateMap.keySet()) {
       CurrentState currentState =
           cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
-              .get(resource.getResourceName());
+              .get(resourceName);
       if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
         handOffEndTime =
             Math.min(handOffEndTime, currentState.getEndTime(partition.getPartitionName()));
       }
     }
 
-    if (handOffStartTime != TRANSITION_FAILED && handOffEndTime - handOffStartTime <= threshold) {
+    if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
       LOG.info(String.format("Missing topstate duration is %d for partition %s",
           handOffEndTime - handOffStartTime, partition.getPartitionName()));
       if (clusterStatusMonitor != null) {
-        clusterStatusMonitor.updateMissingTopStateDurationStats(resource.getResourceName(),
-            handOffEndTime - handOffStartTime, true);
+        clusterStatusMonitor
+            .updateMissingTopStateDurationStats(resourceName, handOffEndTime - handOffStartTime,
+                true);
       }
     }
-    removeFromStatsMap(missingTopStateMap, resource, partition);
+    removeFromStatsMap(missingTopStateMap, resourceName, partition);
   }
 
   private void removeFromStatsMap(Map<String, Map<String, Long>> missingTopStateMap,
-      Resource resource, Partition partition) {
-    if (missingTopStateMap.containsKey(resource.getResourceName())) {
-      missingTopStateMap.get(resource.getResourceName()).remove(partition.getPartitionName());
+      String resourceName, Partition partition) {
+    if (missingTopStateMap.containsKey(resourceName)) {
+      missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
     }
 
-    if (missingTopStateMap.get(resource.getResourceName()).size() == 0) {
-      missingTopStateMap.remove(resource.getResourceName());
+    if (missingTopStateMap.get(resourceName).size() == 0) {
+      missingTopStateMap.remove(resourceName);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7753b602/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index 7b97ac3..3e0bae7 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -19,19 +19,10 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BaseStageTest;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.*;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Resource;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
@@ -39,9 +30,14 @@ import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
 public class TestTopStateHandoffMetrics extends BaseStageTest {
   public final static String TEST_INPUT_FILE = "TestTopStateHandoffMetrics.json";
   public final static String INITIAL_CURRENT_STATES = "initialCurrentStates";
+  public final static String MISSING_TOP_STATES = "MissingTopStates";
   public final static String HANDOFF_CURRENT_STATES = "handoffCurrentStates";
   public final static String EXPECTED_DURATION = "expectedDuration";
   public final static String TEST_RESOURCE = "TestResource";
@@ -53,7 +49,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     Resource resource = new Resource(TEST_RESOURCE);
     resource.setStateModelDefRef("MasterSlave");
     resource.addPartition(PARTITION);
-    event.addAttribute(AttributeName.RESOURCES.name(), Collections.singletonMap(TEST_RESOURCE, resource));
+    event.addAttribute(AttributeName.RESOURCES.name(),
+        Collections.singletonMap(TEST_RESOURCE, resource));
     ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
     monitor.active();
     event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
@@ -61,37 +58,188 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
 
   @Test(dataProvider = "successCurrentStateInput")
   public void testTopStateSuccessHandoff(Map<String, Map<String, String>> initialCurrentStates,
+      Map<String, Map<String, String>> missingTopStates,
       Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
     preSetup();
-    runCurrentStage(initialCurrentStates, handOffCurrentStates);
-    ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, null);
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
     ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
 
     // Should have 1 transition succeeded due to threshold.
     Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
+    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
 
     // Duration should match the expected result
-    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(), (long) expectedDuration);
-    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(), (long) expectedDuration);
+    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
+        (long) expectedDuration);
+    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
+        (long) expectedDuration);
   }
 
   @Test(dataProvider = "failedCurrentStateInput")
   public void testTopStateFailedHandoff(Map<String, Map<String, String>> initialCurrentStates,
+      Map<String, Map<String, String>> missingTopStates,
       Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
     preSetup();
     ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
     clusterConfig.setMissTopStateDurationThreshold(5000L);
     setClusterConfig(clusterConfig);
-    runCurrentStage(initialCurrentStates, handOffCurrentStates);
-    ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, null);
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
     ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
 
     // Should have 1 transition failed due to threshold.
+    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 0);
     Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 1);
 
     // No duration updated.
-    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(), (long)expectedDuration);
-    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(), (long) expectedDuration);
+    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
+        (long) expectedDuration);
+    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
+        (long) expectedDuration);
+  }
+
+  // Test handoff that are triggered by an offline master instance
+  @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
+  public void testTopStateSuccessHandoffWithOfflineNode(
+      final Map<String, Map<String, String>> initialCurrentStates,
+      final Map<String, Map<String, String>> missingTopStates,
+      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+    final long offlineTimeBeforeMasterless = 125;
+
+    preSetup();
+    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates,
+        new MissingStatesDataCacheInject() {
+          @Override
+          public void doInject(ClusterDataCache cache) {
+            Set<String> topStateNodes = new HashSet<>();
+            for (String instance : initialCurrentStates.keySet()) {
+              if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
+                topStateNodes.add(instance);
+              }
+            }
+            // Simulate the previous top state instance goes offline
+            if (!topStateNodes.isEmpty()) {
+              cache.getLiveInstances().keySet().removeAll(topStateNodes);
+              for (String topStateNode : topStateNodes) {
+                long originalStartTime =
+                    Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
+                cache.getInstanceOfflineTimeMap()
+                    .put(topStateNode, originalStartTime - offlineTimeBeforeMasterless);
+              }
+            }
+          }
+        });
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
+
+    // Should have 1 transition succeeded due to threshold.
+    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
+    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
+
+    // Duration should match the expected result
+    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
+        expectedDuration + offlineTimeBeforeMasterless);
+    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
+        expectedDuration + offlineTimeBeforeMasterless);
+  }
+
+  // Test success with no available clue about previous master.
+  // For example, controller is just changed to a new node.
+  @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
+  public void testHandoffDurationWithDefaultStartTime(
+      Map<String, Map<String, String>> initialCurrentStates,
+      Map<String, Map<String, String>> missingTopStates,
+      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+    preSetup();
+
+    // reset expectedDuration since current system time would be used
+    for (Map<String, String> states : handOffCurrentStates.values()) {
+      if (states.get("CurrentState").equals("MASTER")) {
+        expectedDuration = Long.parseLong(states.get("EndTime")) - System.currentTimeMillis();
+        break;
+      }
+    }
+
+    // No initialCurrentStates means no input can be used as the clue of the previous master.
+    runCurrentStage(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates, null);
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
+
+    // Should have 1 transition succeeded due to threshold.
+    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
+    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
+
+    // Duration should match the expected result.
+    // Note that the gap between expectedDuration calculated and monitor calculates the value should be allowed
+    Assert.assertTrue(
+        expectedDuration - monitor.getSuccessfulTopStateHandoffDurationCounter() < 1500);
+    Assert.assertTrue(
+        expectedDuration - monitor.getMaxSinglePartitionTopStateHandoffDurationGauge() < 1500);
+  }
+
+  /**
+   * Test success with only a pending message as the clue.
+   * For instance, if the master was dropped, there is no way to track the dropping time.
+   * So either use current system time.
+   *
+   * @see org.apache.helix.monitoring.mbeans.TestTopStateHandoffMetrics#testHandoffDurationWithDefaultStartTime
+   * Or we can check if any pending message to be used as the start time.
+   */
+  @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
+  public void testHandoffDurationWithPendingMessage(
+      final Map<String, Map<String, String>> initialCurrentStates,
+      final Map<String, Map<String, String>> missingTopStates,
+      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+    final long messageTimeBeforeMasterless = 145;
+    preSetup();
+    // No initialCurrentStates means no input can be used as the clue of the previous master.
+    runCurrentStage(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates,
+        new MissingStatesDataCacheInject() {
+          @Override
+          public void doInject(ClusterDataCache cache) {
+            String topStateNode = null;
+            for (String instance : initialCurrentStates.keySet()) {
+              if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
+                topStateNode = instance;
+                break;
+              }
+            }
+            // Simulate the previous top state instance goes offline
+            if (topStateNode != null) {
+              long originalStartTime =
+                  Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
+              // Inject a message that fit expectedDuration
+              Message message =
+                  new Message(Message.MessageType.STATE_TRANSITION, "thisisafakemessage");
+              message.setTgtSessionId(SESSION_PREFIX + topStateNode.split("_")[1]);
+              message.setToState("MASTER");
+              message.setCreateTimeStamp(originalStartTime - messageTimeBeforeMasterless);
+              message.setTgtName(topStateNode);
+              message.setResourceName(TEST_RESOURCE);
+              message.setPartitionName(PARTITION);
+              cache.cacheMessages(Collections.singletonList(message));
+            }
+          }
+        });
+
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
+
+    // Should have 1 transition succeeded due to threshold.
+    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
+    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
+
+    // Duration should match the expected result
+    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
+        expectedDuration + messageTimeBeforeMasterless);
+    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
+        expectedDuration + messageTimeBeforeMasterless);
   }
 
   private final String CURRENT_STATE = "CurrentState";
@@ -103,6 +251,7 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
   public Object[][] successCurrentState() {
     return loadInputData("succeeded");
   }
+
   @DataProvider(name = "failedCurrentStateInput")
   public Object[][] failedCurrentState() {
     return loadInputData("failed");
@@ -118,14 +267,18 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
 
       List<Map<String, Object>> inputs = (List<Map<String, Object>>) inputMaps.get(inputEntry);
       inputData = new Object[inputs.size()][];
-      for (int i = 0; i < inputs.size(); i++) {
+      for (int i = 0; i < inputData.length; i++) {
         Map<String, Map<String, String>> intialCurrentStates =
             (Map<String, Map<String, String>>) inputs.get(i).get(INITIAL_CURRENT_STATES);
+        Map<String, Map<String, String>> missingTopStates =
+            (Map<String, Map<String, String>>) inputs.get(i).get(MISSING_TOP_STATES);
         Map<String, Map<String, String>> handoffCurrentStates =
             (Map<String, Map<String, String>>) inputs.get(i).get(HANDOFF_CURRENT_STATES);
         Long expectedDuration = Long.parseLong((String) inputs.get(i).get(EXPECTED_DURATION));
 
-        inputData[i] = new Object[] { intialCurrentStates, handoffCurrentStates, expectedDuration };
+        inputData[i] = new Object[] { intialCurrentStates, missingTopStates, handoffCurrentStates,
+            expectedDuration
+        };
       }
     } catch (IOException e) {
       e.printStackTrace();
@@ -151,12 +304,30 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
   }
 
   private void runCurrentStage(Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> handOffCurrentStates) {
-    setupCurrentStates(generateCurrentStateMap(initialCurrentStates));
+      Map<String, Map<String, String>> missingTopStates,
+      Map<String, Map<String, String>> handOffCurrentStates,
+      MissingStatesDataCacheInject testInjection) {
+
+    if (initialCurrentStates != null && !initialCurrentStates.isEmpty()) {
+      setupCurrentStates(generateCurrentStateMap(initialCurrentStates));
+      runStage(event, new ReadClusterDataStage());
+      runStage(event, new CurrentStateComputationStage());
+    }
+
+    setupCurrentStates(generateCurrentStateMap(missingTopStates));
     runStage(event, new ReadClusterDataStage());
+    if (testInjection != null) {
+      ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+      testInjection.doInject(cache);
+    }
     runStage(event, new CurrentStateComputationStage());
+
     setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
     runStage(event, new ReadClusterDataStage());
     runStage(event, new CurrentStateComputationStage());
   }
+
+  interface MissingStatesDataCacheInject {
+    void doInject(ClusterDataCache cache);
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7753b602/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
index c4e4f79..41393ea 100644
--- a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
+++ b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
@@ -1,178 +1,258 @@
 {
-  "succeeded" : [
+  "succeeded": [
     {
       "initialCurrentStates": {
-        "localhost_0" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "MASTER",
-          "StartTime" : "15000",
-          "EndTime" : "18000"
-        },
-        "localhost_1" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
-        },
-        "localhost_2" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
+        "localhost_0": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "10000",
+          "EndTime": "11000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        }
+      },
+      "MissingTopStates": {
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "15000",
+          "EndTime": "18000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
         }
       },
       "handoffCurrentStates": {
-        "localhost_0" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "MASTER",
-          "StartTime" : "15000",
-          "EndTime" : "18000"
-        },
-        "localhost_1" : {
-          "CurrentState" : "MASTER",
-          "PreviousState" : "SLAVE",
-          "StartTime" : "20000",
-          "EndTime" : "22000"
-        },
-        "localhost_2" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "15000",
+          "EndTime": "18000"
+        },
+        "localhost_1": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "20000",
+          "EndTime": "22000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
         }
       },
-      "expectedDuration" : "7000"
+      "expectedDuration": "7000"
     },
     {
-    "initialCurrentStates": {
-      "localhost_0" : {
-        "CurrentState" : "SLAVE",
-        "PreviousState" : "MASTER",
-        "StartTime" : "1000",
-        "EndTime" : "2000"
-      },
-      "localhost_1" : {
-        "CurrentState" : "SLAVE",
-        "PreviousState" : "MASTER",
-        "StartTime" : "8000",
-        "EndTime" : "10000"
+      "initialCurrentStates": {
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "1000",
+          "EndTime": "2000"
+        },
+        "localhost_1": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "2500",
+          "EndTime": "5000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        }
       },
-      "localhost_2" : {
-        "CurrentState" : "SLAVE",
-        "PreviousState" : "OFFLINE",
-        "StartTime" : "8000",
-        "EndTime" : "10000"
-      }
-    },
-    "handoffCurrentStates": {
-      "localhost_0" : {
-        "CurrentState" : "SLAVE",
-        "PreviousState" : "MASTER",
-        "StartTime" : "1000",
-        "EndTime" : "2000"
+      "MissingTopStates": {
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "1000",
+          "EndTime": "2000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        }
       },
-      "localhost_1" : {
-        "CurrentState" : "MASTER",
-        "PreviousState" : "SLAVE",
-        "StartTime" : "20000",
-        "EndTime" : "22000"
+      "handoffCurrentStates": {
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "1000",
+          "EndTime": "2000"
+        },
+        "localhost_1": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "20000",
+          "EndTime": "22000"
+        },
+        "localhost_2": {
+          "CurrentState": "ERROR",
+          "PreviousState": "SLAVE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        }
       },
-      "localhost_2" : {
-        "CurrentState" : "ERROR",
-        "PreviousState" : "SLAVE",
-        "StartTime" : "8000",
-        "EndTime" : "10000"
-      }
-    },
-    "expectedDuration" : "14000"
+      "expectedDuration": "14000"
     }
   ],
-  "failed" : [
+  "failed": [
     {
       "initialCurrentStates": {
-        "localhost_0" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "MASTER",
-          "StartTime" : "15000",
-          "EndTime" : "18000"
-        },
-        "localhost_1" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
-        },
-        "localhost_2" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
+        "localhost_0": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "10000",
+          "EndTime": "11000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        }
+      },
+      "MissingTopStates": {
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "15000",
+          "EndTime": "18000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
         }
       },
       "handoffCurrentStates": {
-        "localhost_0" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "MASTER",
-          "StartTime" : "15000",
-          "EndTime" : "18000"
-        },
-        "localhost_1" : {
-          "CurrentState" : "MASTER",
-          "PreviousState" : "SLAVE",
-          "StartTime" : "20000",
-          "EndTime" : "22000"
-        },
-        "localhost_2" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "15000",
+          "EndTime": "18000"
+        },
+        "localhost_1": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "20000",
+          "EndTime": "22000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
         }
       },
-      "expectedDuration" : "0"
+      "expectedDuration": "0"
     },
     {
       "initialCurrentStates": {
-        "localhost_0" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "MASTER",
-          "StartTime" : "15000",
-          "EndTime" : "18000"
-        },
-        "localhost_1" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
-        },
-        "localhost_2" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "MASTER",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
+        "localhost_0": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "10000",
+          "EndTime": "11000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        }
+      },
+      "MissingTopStates": {
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "15000",
+          "EndTime": "18000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "8000",
+          "EndTime": "10000"
         }
       },
       "handoffCurrentStates": {
-        "localhost_0" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "MASTER",
-          "StartTime" : "15000",
-          "EndTime" : "18000"
-        },
-        "localhost_1" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "20000",
-          "EndTime" : "22000"
-        },
-        "localhost_2" : {
-          "CurrentState" : "SLAVE",
-          "PreviousState" : "OFFLINE",
-          "StartTime" : "8000",
-          "EndTime" : "10000"
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "15000",
+          "EndTime": "18000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "20000",
+          "EndTime": "22000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "8000",
+          "EndTime": "10000"
         }
       },
-      "expectedDuration" : "0"
+      "expectedDuration": "0"
     }
   ]
 }