You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/25 23:03:11 UTC

helix git commit: [HELIX-753] Record top state handoff finished in single cluster data cache refresh

Repository: helix
Updated Branches:
  refs/heads/master ca364aecf -> 67ff66b48


[HELIX-753] Record top state handoff finished in single cluster data cache refresh


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/67ff66b4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/67ff66b4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/67ff66b4

Branch: refs/heads/master
Commit: 67ff66b4897309c785b8b42863e95734eba81aab
Parents: ca364ae
Author: Harry Zhang <hr...@linkedin.com>
Authored: Fri Sep 21 14:32:15 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Thu Oct 25 16:02:22 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  17 +-
 .../helix/controller/stages/AttributeName.java  |   3 +-
 .../helix/controller/stages/ClusterEvent.java   |  10 +-
 .../stages/CurrentStateComputationStage.java    | 293 +++++++++++++++----
 .../mbeans/TestTopStateHandoffMetrics.java      | 182 ++++++------
 .../resources/TestTopStateHandoffMetrics.json   |  48 +++
 6 files changed, 410 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6fd4623..b7934b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -142,6 +142,11 @@ public class GenericHelixController implements IdealStateChangeListener,
   private ClusterDataCache _taskCache;
   private ScheduledExecutorService _asyncTasksThreadPool;
 
+  /**
+   * A record of last pipeline finish duration
+   */
+  private long _lastPipelineEndTimestamp;
+
   private String _clusterName;
 
   enum PipelineTypes {
@@ -371,6 +376,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     _taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
 
     _forceRebalanceTimer = new Timer();
+    _lastPipelineEndTimestamp = CurrentStateComputationStage.NOT_RECORDED;
 
     initializeAsyncFIFOWorkers();
     initPipelines(_eventThread, _cache, false);
@@ -461,6 +467,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     // add the cache
     _cache.setEventId(event.getEventId());
     event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+    event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
 
     List<Pipeline> pipelines = cache.isTaskCache() ?
         _taskRegistry.getPipelinesForEvent(event.getEventType()) : _registry
@@ -516,11 +523,12 @@ public class GenericHelixController implements IdealStateChangeListener,
     if (!rebalanceFail) {
       _continousRebalanceFailureCount = 0;
     }
-    long endTime = System.currentTimeMillis();
+
+    _lastPipelineEndTimestamp = System.currentTimeMillis();
     logger.info(String
         .format("END: Invoking %s controller pipeline for event: %s %s for cluster %s, took %d ms",
             getPipelineType(cache.isTaskCache()), event.getEventType(), event.getEventId(),
-            manager.getClusterName(), (endTime - startTime)));
+            manager.getClusterName(), (_lastPipelineEndTimestamp - startTime)));
 
     if (!cache.isTaskCache()) {
       // report event process durations
@@ -546,13 +554,14 @@ public class GenericHelixController implements IdealStateChangeListener,
                 startTime - enqueueTime);
         _clusterStatusMonitor
             .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
-                endTime - startTime);
+                _lastPipelineEndTimestamp - startTime);
       }
       sb.append(String.format(
           "InQueue time for event: " + event.getEventType() + " took: " + (startTime - enqueueTime)
               + " ms\n"));
       sb.append(String.format(
-          "TotalProcessed time for event: " + event.getEventType() + " took: " + (endTime
+          "TotalProcessed time for event: " + event.getEventType() + " took: " + (
+              _lastPipelineEndTimestamp
               - startTime) + " ms"));
       logger.info(sb.toString());
     } else if (_isMonitoring) {

http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 56bbb44..55f7081 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -37,5 +37,6 @@ public enum AttributeName {
   instanceName,
   eventData,
   AsyncFIFOWorkerPool,
-  PipelineType
+  PipelineType,
+  LastRebalanceFinishTimeStamp
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
index e82b0e4..a65e9f0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -93,11 +93,13 @@ public class ClusterEvent {
 
   @SuppressWarnings("unchecked")
   public <T extends Object> T getAttribute(String attrName) {
+    return getAttributeWithDefault(attrName, null);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends Object> T getAttributeWithDefault(String attrName, T defaultVal) {
     Object ret = _eventAttributeMap.get(attrName);
-    if (ret != null) {
-      return (T) ret;
-    }
-    return null;
+    return ret == null ? defaultVal : (T) ret;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 340a051..0713070 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -19,20 +19,19 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.helix.controller.LogUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 
 /**
  * For each LiveInstances select currentState and message whose sessionId matches
@@ -42,14 +41,16 @@ import java.util.Map;
 public class CurrentStateComputationStage extends AbstractBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(CurrentStateComputationStage.class);
 
-  public final long NOT_RECORDED = -1L;
-  public final long TRANSITION_FAILED = -2L;
-  public final String TASK_STATE_MODEL_NAME = "Task";
+  public static final long NOT_RECORDED = -1L;
+  public static final long TRANSITION_FAILED = -2L;
+  public static final String TASK_STATE_MODEL_NAME = "Task";
 
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    final Long lastPipelineFinishTimestamp = event
+        .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), NOT_RECORDED);
     final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
     if (cache == null || resourceMap == null) {
@@ -79,7 +80,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       ClusterStatusMonitor clusterStatusMonitor =
           event.getAttribute(AttributeName.clusterStatusMonitor.name());
       // TODO Update the status async -- jjwang
-      updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput);
+      updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
+          lastPipelineFinishTimestamp);
     }
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
   }
@@ -220,7 +222,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
 
   private void updateTopStateStatus(ClusterDataCache cache,
       ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
-      CurrentStateOutput currentStateOutput) {
+      CurrentStateOutput currentStateOutput,
+      long lastPipelineFinishTimestamp) {
     Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
     Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
 
@@ -244,53 +247,232 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       String resourceName = resource.getResourceName();
 
       for (Partition partition : resource.getPartitions()) {
-        Map<String, String> stateMap =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-
-        for (String instance : stateMap.keySet()) {
-          if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
-            if (!lastTopStateMap.containsKey(resourceName)) {
-              lastTopStateMap.put(resourceName, new HashMap<String, String>());
-            }
-            // recording any top state, it is enough for tracking the top state changes.
-            lastTopStateMap.get(resourceName).put(partition.getPartitionName(), instance);
-            break;
-          }
-        }
-
-        if (stateMap.values().contains(stateModelDef.getTopState())) {
-          // Top state comes back
-          // The first time participant started or controller switched will be ignored
-          reportTopStateComesBack(cache, stateMap, missingTopStateMap, resourceName, partition,
-              clusterStatusMonitor, durationThreshold, stateModelDef.getTopState());
+        String currentTopStateInstance =
+            findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
+        String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
+
+        if (currentTopStateInstance != null) {
+          reportTopStateExistance(cache, currentStateOutput, stateModelDef, resourceName, partition,
+              lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
+              durationThreshold, lastPipelineFinishTimestamp);
+          updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
         } else {
-          // TODO: improve following with MIN_ACTIVE_TOP_STATE logic
-          // Missing top state need to record
-          reportNewTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
+          reportTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
               partition, stateModelDef.getTopState(), currentStateOutput);
+          reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
+              clusterStatusMonitor);
         }
       }
     }
 
-    // Check whether it is already passed threshold
-    for (String resourceName : missingTopStateMap.keySet()) {
-      for (String partitionName : missingTopStateMap.get(resourceName).keySet()) {
-        Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
-        if (startTime != null && startTime > 0
-            && System.currentTimeMillis() - startTime > durationThreshold) {
-          missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
-          if (clusterStatusMonitor != null) {
-            clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
-          }
-        }
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.resetMaxMissingTopStateGauge();
+    }
+  }
+
+  /**
+   * From current state output, find out the location of the top state of given resource
+   * and partition
+   *
+   * @param currentStateOutput current state output
+   * @param resourceName resource name
+   * @param partition partition of the resource
+   * @param stateModelDef state model def object
+   * @return name of the node that contains top state, null if there is not top state recorded
+   */
+  private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
+      String resourceName, Partition partition, StateModelDefinition stateModelDef) {
+    Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
+    for (String instance : stateMap.keySet()) {
+      if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
+        return instance;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Find cached top state location of the given resource and partition
+   *
+   * @param cache cluster data cache object
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @return cached name of the node that contains top state, null if not previously cached
+   */
+  private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
+        .get(partition.getPartitionName()) : null;
+  }
+
+  /**
+   * Update top state location cache of the given resource and partition
+   *
+   * @param cache cluster data cache object
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param currentTopStateInstance name of the instance that currently has the top state
+   */
+  private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
+      Partition partition, String currentTopStateInstance) {
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    if (!lastTopStateMap.containsKey(resourceName)) {
+      lastTopStateMap.put(resourceName, new HashMap<String, String>());
+    }
+    lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
+  }
+
+  /**
+   * When we observe a top state of a given resource and partition, we need to report for the
+   * following 2 scenarios:
+   *  1. This is a top state come back, i.e. we have a previously missing top state record
+   *  2. Top state location change, i.e. current top state location is different from what
+   *     we saw previously
+   *
+   * @param cache cluster data cache
+   * @param currentStateOutput generated after computiting current state
+   * @param stateModelDef State model definition object of the given resource
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param lastTopStateInstance our cached top state location
+   * @param currentTopStateInstance top state location we observed during this pipeline run
+   * @param clusterStatusMonitor monitor object
+   * @param durationThreshold top state handoff duration threshold
+   * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
+   */
+  private void reportTopStateExistance(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
+      StateModelDefinition stateModelDef, String resourceName, Partition partition,
+      String lastTopStateInstance, String currentTopStateInstance,
+      ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
+      long lastPipelineFinishTimestamp) {
+
+    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
+
+    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName())) {
+      // We previously recorded a top state missing, and it's not coming back
+      reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
+          missingTopStateMap, resourceName, partition, clusterStatusMonitor, durationThreshold,
+          stateModelDef.getTopState());
+    } else if (lastTopStateInstance != null && !lastTopStateInstance
+        .equals(currentTopStateInstance)) {
+      // With no missing top state record, but top state instance changed,
+      // we observed an entire top state handoff process
+      reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
+          resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
+    } else {
+      // else, there is not top state change, or top state first came up, do nothing
+      LogUtil.logDebug(LOG, _eventId, String.format(
+          "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
+          partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
+    }
+  }
+
+  /**
+   * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
+   * i.e. current top state instance loaded from ZK is different than the one we cached during
+   * last pipeline run.
+   *
+   * @param cache ClusterDataCache
+   * @param lastTopStateInstance Name of last top state instance we cached
+   * @param curTopStateInstance Name of current top state instance we refreshed from ZK
+   * @param resourceName resource name
+   * @param partition partition object
+   * @param clusterStatusMonitor cluster state monitor object
+   * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
+   */
+  private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
+      String curTopStateInstance, String resourceName, Partition partition,
+      ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
+    if (curTopStateInstance.equals(lastTopStateInstance)) {
+      return;
+    }
+
+    // Current state output generation logic guarantees that current top state instance
+    // must be a live instance
+    String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
+    long endTime =
+        cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+            .getEndTime(partition.getPartitionName());
+
+    long startTime = NOT_RECORDED;
+    if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
+      String lastTopStateSession =
+          cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
+      // Make sure last top state instance has not bounced during cluster data cache refresh
+      // We need this null check as there are test cases creating incomplete current state
+      if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+          != null) {
+        startTime =
+            cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+                .getStartTime(partition.getPartitionName());
       }
     }
+    if (startTime == NOT_RECORDED) {
+      // either cached last top state instance is no longer alive, or it bounced during cluster
+      // data cache refresh, we use last pipeline run end time for best guess. Though we can
+      // calculate this number in a more precise way by refreshing data from ZK, given the rarity
+      // of this corner case, it's not worthy.
+      startTime = lastPipelineFinishTimestamp;
+    }
+
+    if (startTime == NOT_RECORDED || startTime > endTime) {
+      // Top state handoff finished before end of last pipeline run, and instance contains
+      // previous top state is no longer alive, so our best guess did not work, ignore the
+      // data point for now.
+      LogUtil.logWarn(LOG, _eventId, String
+          .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
+              partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
+      return;
+    }
+
+    LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
+        endTime - startTime, partition.getPartitionName()));
     if (clusterStatusMonitor != null) {
-      clusterStatusMonitor.resetMaxMissingTopStateGauge();
+      clusterStatusMonitor
+          .updateMissingTopStateDurationStats(resourceName, endTime - startTime,
+              true);
+    }
+  }
+
+  /**
+   * Check if the given partition of the given resource has a missing top state duration larger
+   * than the threshold, if so, report a top state transition failure
+   *
+   * @param cache cluster data cache
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param durationThreshold top state handoff duration threshold
+   * @param clusterStatusMonitor monitor object
+   */
+  private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
+      Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
+    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
+    String partitionName = partition.getPartitionName();
+    Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
+    if (startTime != null && startTime > 0
+        && System.currentTimeMillis() - startTime > durationThreshold) {
+      missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
+      }
     }
   }
 
-  private void reportNewTopStateMissing(ClusterDataCache cache,
+  /**
+   * When we find a top state missing of the given partition, we find out when it started to miss
+   * top state, then we record it in cache
+   *
+   * @param cache cluster data cache
+   * @param missingTopStateMap missing top state record
+   * @param lastTopStateMap our cached last top state locations
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param topState top state name
+   * @param currentStateOutput current state output
+   */
+  private void reportTopStateMissing(ClusterDataCache cache,
       Map<String, Map<String, Long>> missingTopStateMap,
       Map<String, Map<String, String>> lastTopStateMap, String resourceName, Partition partition,
       String topState, CurrentStateOutput currentStateOutput) {
@@ -365,14 +547,23 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
     }
   }
 
+  /**
+   * When we see a top state come back, i.e. we observe a top state in this pipeline run,
+   * but have a top state missing record before, we need to remove the top state missing
+   * record and report top state handoff duration
+   *
+   * @param cache cluster data cache
+   * @param stateMap state map of the given partition of the given resource
+   * @param missingTopStateMap missing top state record
+   * @param resourceName resource name
+   * @param partition partition of the resource
+   * @param clusterStatusMonitor monitor object
+   * @param threshold top state handoff threshold
+   * @param topState name of the top state
+   */
   private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap,
       Map<String, Map<String, Long>> missingTopStateMap, String resourceName, Partition partition,
       ClusterStatusMonitor clusterStatusMonitor, long threshold, String topState) {
-    if (!missingTopStateMap.containsKey(resourceName) || !missingTopStateMap.get(resourceName)
-        .containsKey(partition.getPartitionName())) {
-      // there is no previous missing recorded
-      return;
-    }
 
     long handOffStartTime = missingTopStateMap.get(resourceName).get(partition.getPartitionName());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index a60d79f..fac957c 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -19,6 +19,7 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.google.common.collect.Range;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
@@ -51,7 +52,7 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
   public final static String TEST_RESOURCE = "TestResource";
   public final static String PARTITION = "PARTITION";
 
-  public void preSetup() {
+  private void preSetup() {
     setupLiveInstances(3);
     setupStateModel();
     Resource resource = new Resource(TEST_RESOURCE);
@@ -59,6 +60,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     resource.addPartition(PARTITION);
     event.addAttribute(AttributeName.RESOURCES.name(),
         Collections.singletonMap(TEST_RESOURCE, resource));
+    event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
+        CurrentStateComputationStage.NOT_RECORDED);
     ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
     monitor.active();
     event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
@@ -69,20 +72,49 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
       Map<String, Map<String, String>> missingTopStates,
       Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
     preSetup();
-    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, null);
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
+    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
+        Range.closed(expectedDuration, expectedDuration),
+        Range.closed(expectedDuration, expectedDuration));
+  }
 
-    // Should have 1 transition succeeded due to threshold.
-    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
-    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
+  @Test(dataProvider = "fastCurrentStateInput")
+  public void testFastTopStateHandoffWithNoMissingTopState(
+      Map<String, Map<String, String>> initialCurrentStates,
+      Map<String, Map<String, String>> missingTopStates,
+      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
+  ) {
+    preSetup();
+    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
+        Range.closed(expectedDuration, expectedDuration),
+        Range.closed(expectedDuration, expectedDuration));
+  }
 
-    // Duration should match the expected result
-    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
-        (long) expectedDuration);
-    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
-        (long) expectedDuration);
+  @Test(dataProvider = "fastCurrentStateInput")
+  public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(
+      Map<String, Map<String, String>> initialCurrentStates,
+      Map<String, Map<String, String>> missingTopStates,
+      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
+  ) {
+    preSetup();
+    event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), 7500L);
+    // By simulating last master instance crash, we now have:
+    //  - M->S from 6000 to 7000
+    //  - lastPipelineFinishTimestamp is 7500
+    //  - S->M from 8000 to 9000
+    // Therefore the recorded latency should be 9000 - 7500 = 1500
+    runStageAndVerify(
+        initialCurrentStates, missingTopStates, handOffCurrentStates,
+        new MissingStatesDataCacheInject() {
+          @Override
+          public void doInject(ClusterDataCache cache) {
+            cache.getLiveInstances().remove("localhost_1");
+          }
+        }, 1, 0,
+        Range.closed(1500L, 1500L),
+        Range.closed(1500L, 1500L)
+    );
+    event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
+        CurrentStateComputationStage.NOT_RECORDED);
   }
 
   @Test(dataProvider = "failedCurrentStateInput")
@@ -93,20 +125,10 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
     clusterConfig.setMissTopStateDurationThreshold(5000L);
     setClusterConfig(clusterConfig);
-    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, null);
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
-    // Should have 1 transition failed due to threshold.
-    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 0);
-    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 1);
 
-    // No duration updated.
-    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
-        (long) expectedDuration);
-    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
-        (long) expectedDuration);
+    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 0, 1,
+        Range.closed(expectedDuration, expectedDuration),
+        Range.closed(expectedDuration, expectedDuration));
   }
 
   // Test handoff that are triggered by an offline master instance
@@ -118,7 +140,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     final long offlineTimeBeforeMasterless = 125;
 
     preSetup();
-    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates,
+    long durationToVerify = expectedDuration + offlineTimeBeforeMasterless;
+    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates,
         new MissingStatesDataCacheInject() {
           @Override
           public void doInject(ClusterDataCache cache) {
@@ -139,20 +162,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
               }
             }
           }
-        });
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
-    // Should have 1 transition succeeded due to threshold.
-    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
-    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
-
-    // Duration should match the expected result
-    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
-        expectedDuration + offlineTimeBeforeMasterless);
-    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
-        expectedDuration + offlineTimeBeforeMasterless);
+        }, 1, 0, Range.closed(durationToVerify, durationToVerify),
+        Range.closed(durationToVerify, durationToVerify));
   }
 
   // Test success with no available clue about previous master.
@@ -173,21 +184,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     }
 
     // No initialCurrentStates means no input can be used as the clue of the previous master.
-    runCurrentStage(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates, null);
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
-    // Should have 1 transition succeeded due to threshold.
-    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
-    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
-
-    // Duration should match the expected result.
-    // Note that the gap between expectedDuration calculated and monitor calculates the value should be allowed
-    Assert.assertTrue(
-        expectedDuration - monitor.getSuccessfulTopStateHandoffDurationCounter() < 1500);
-    Assert.assertTrue(
-        expectedDuration - monitor.getMaxSinglePartitionTopStateHandoffDurationGauge() < 1500);
+    runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates, null, 1, 0,
+        Range.atMost(1500L), Range.atMost(1500L));
   }
 
   /**
@@ -205,11 +203,12 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
       Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
     final long messageTimeBeforeMasterless = 145;
     preSetup();
+
+    long durationToVerify = expectedDuration + messageTimeBeforeMasterless;
     // No initialCurrentStates means no input can be used as the clue of the previous master.
-    runCurrentStage(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates,
+    runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates,
         new MissingStatesDataCacheInject() {
-          @Override
-          public void doInject(ClusterDataCache cache) {
+          @Override public void doInject(ClusterDataCache cache) {
             String topStateNode = null;
             for (String instance : initialCurrentStates.keySet()) {
               if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
@@ -233,21 +232,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
               cache.cacheMessages(Collections.singletonList(message));
             }
           }
-        });
-
-    ClusterStatusMonitor clusterStatusMonitor =
-        event.getAttribute(AttributeName.clusterStatusMonitor.name());
-    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
-    // Should have 1 transition succeeded due to threshold.
-    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
-    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
-
-    // Duration should match the expected result
-    Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
-        expectedDuration + messageTimeBeforeMasterless);
-    Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
-        expectedDuration + messageTimeBeforeMasterless);
+        }, 1, 0, Range.closed(durationToVerify, durationToVerify),
+        Range.closed(durationToVerify, durationToVerify));
   }
 
   private final String CURRENT_STATE = "CurrentState";
@@ -265,6 +251,11 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     return loadInputData("failed");
   }
 
+  @DataProvider(name = "fastCurrentStateInput")
+  public Object[][] fastCurrentState() {
+    return loadInputData("fast");
+  }
+
   private Object[][] loadInputData(String inputEntry) {
     Object[][] inputData = null;
     InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE);
@@ -322,17 +313,42 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
       runStage(event, new CurrentStateComputationStage());
     }
 
-    setupCurrentStates(generateCurrentStateMap(missingTopStates));
-    runStage(event, new ReadClusterDataStage());
-    if (testInjection != null) {
-      ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-      testInjection.doInject(cache);
+    if (missingTopStates != null && !missingTopStates.isEmpty()) {
+      setupCurrentStates(generateCurrentStateMap(missingTopStates));
+      runStage(event, new ReadClusterDataStage());
+      if (testInjection != null) {
+        ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+        testInjection.doInject(cache);
+      }
+      runStage(event, new CurrentStateComputationStage());
     }
-    runStage(event, new CurrentStateComputationStage());
 
-    setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
-    runStage(event, new ReadClusterDataStage());
-    runStage(event, new CurrentStateComputationStage());
+    if (handOffCurrentStates != null && !handOffCurrentStates.isEmpty()) {
+      setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
+      runStage(event, new ReadClusterDataStage());
+      runStage(event, new CurrentStateComputationStage());
+    }
+  }
+
+  private void runStageAndVerify(
+      Map<String, Map<String, String>> initialCurrentStates,
+      Map<String, Map<String, String>> missingTopStates,
+      Map<String, Map<String, String>> handOffCurrentStates, MissingStatesDataCacheInject inject,
+      int successCnt, int failCnt,
+      Range<Long> expectedDuration, Range<Long> expectedMaxDuration
+  ) {
+    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, inject);
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
+
+    Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), successCnt);
+    Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), failCnt);
+
+    Assert.assertTrue(
+        expectedDuration.contains(monitor.getSuccessfulTopStateHandoffDurationCounter()));
+    Assert.assertTrue(
+        expectedMaxDuration.contains(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge()));
   }
 
   interface MissingStatesDataCacheInject {

http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
index 41393ea..d296e6c 100644
--- a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
+++ b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
@@ -254,5 +254,53 @@
       },
       "expectedDuration": "0"
     }
+  ],
+  "fast": [
+    {
+      "initialCurrentStates": {
+        "localhost_0": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "1000",
+          "EndTime": "2000"
+        },
+        "localhost_1": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "2500",
+          "EndTime": "5000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "1000",
+          "EndTime": "2000"
+        }
+      },
+      "MissingTopStates": {
+        "localhost_0": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": "8000",
+          "EndTime": "9000"
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "MASTER",
+          "StartTime": "6000",
+          "EndTime": "7000"
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": "1000",
+          "EndTime": "2000"
+        }
+      },
+      "handoffCurrentStates": {
+
+      },
+      "expectedDuration": "3000"
+    }
   ]
 }