You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 23:08:13 UTC

[2/2] helix git commit: [HELIX-771] More detailed top state handoff metrics

[HELIX-771] More detailed top state handoff metrics


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7e49f995
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7e49f995
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7e49f995

Branch: refs/heads/master
Commit: 7e49f995e29ea200fcc42ce6af148ed521979f5c
Parents: 9d7364d
Author: Harry Zhang <hr...@linkedin.com>
Authored: Tue Oct 30 15:55:20 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Tue Oct 30 15:58:45 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   3 +-
 .../controller/stages/ClusterDataCache.java     |   4 +-
 .../stages/CurrentStateComputationStage.java    | 399 ---------------
 .../stages/MissingTopStateRecord.java           |  58 +++
 .../stages/TaskGarbageCollectionStage.java      |   8 +
 .../stages/TopStateHandoffReportStage.java      | 506 +++++++++++++++++++
 .../monitoring/mbeans/ClusterStatusMonitor.java |  37 +-
 .../monitoring/mbeans/ResourceMonitor.java      |  77 ++-
 .../java/org/apache/helix/task/TaskDriver.java  |   2 +-
 .../task/TestIndependentTaskRebalancer.java     |  60 ++-
 .../mbeans/TestTopStateHandoffMetrics.java      | 418 +++++++++------
 .../resources/TestTopStateHandoffMetrics.json   | 263 ++++++----
 12 files changed, 1103 insertions(+), 732 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 800a331..dd409e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -269,6 +269,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       dataPreprocess.addStage(new ResourceComputationStage());
       dataPreprocess.addStage(new ResourceValidationStage());
       dataPreprocess.addStage(new CurrentStateComputationStage());
+      dataPreprocess.addStage(new TopStateHandoffReportStage());
 
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline(pipelineName);
@@ -381,7 +382,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     _taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
 
     _forceRebalanceTimer = new Timer();
-    _lastPipelineEndTimestamp = CurrentStateComputationStage.NOT_RECORDED;
+    _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
 
     initializeAsyncFIFOWorkers();
     initPipelines(_eventThread, _cache, false);

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index e1a374e..6de6d51 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -85,7 +85,7 @@ public class ClusterDataCache extends AbstractDataCache {
   private Map<String, ResourceConfig> _resourceConfigCacheMap;
   private Map<String, ClusterConstraints> _constraintMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
-  private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
+  private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap = new HashMap<>();
   private Map<String, Map<String, String>> _lastTopStateLocationMap = new HashMap<>();
   private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
   private Map<String, ExternalView> _externalViewMap = new HashMap<>();
@@ -678,7 +678,7 @@ public class ClusterDataCache extends AbstractDataCache {
     return null;
   }
 
-  public Map<String, Map<String, Long>> getMissingTopStateMap() {
+  public Map<String, Map<String, MissingTopStateRecord>> getMissingTopStateMap() {
     return _missingTopStateMap;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 0713070..d4927ec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.controller.LogUtil;
@@ -28,7 +27,6 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,16 +39,10 @@ import org.slf4j.LoggerFactory;
 public class CurrentStateComputationStage extends AbstractBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(CurrentStateComputationStage.class);
 
-  public static final long NOT_RECORDED = -1L;
-  public static final long TRANSITION_FAILED = -2L;
-  public static final String TASK_STATE_MODEL_NAME = "Task";
-
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-    final Long lastPipelineFinishTimestamp = event
-        .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), NOT_RECORDED);
     final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
     if (cache == null || resourceMap == null) {
@@ -75,14 +67,6 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           instanceSessionId);
       updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
     }
-
-    if (!cache.isTaskCache()) {
-      ClusterStatusMonitor clusterStatusMonitor =
-          event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      // TODO Update the status async -- jjwang
-      updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
-          lastPipelineFinishTimestamp);
-    }
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
   }
 
@@ -219,387 +203,4 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
     }
   }
-
-  private void updateTopStateStatus(ClusterDataCache cache,
-      ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
-      CurrentStateOutput currentStateOutput,
-      long lastPipelineFinishTimestamp) {
-    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-
-    long durationThreshold = Long.MAX_VALUE;
-    if (cache.getClusterConfig() != null) {
-      durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
-    }
-
-    // Remove any resource records that no longer exists
-    missingTopStateMap.keySet().retainAll(resourceMap.keySet());
-    lastTopStateMap.keySet().retainAll(resourceMap.keySet());
-
-    for (Resource resource : resourceMap.values()) {
-      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-      if (stateModelDef == null || resource.getStateModelDefRef()
-          .equalsIgnoreCase(TASK_STATE_MODEL_NAME)) {
-        // Resource does not have valid statemodel or it is task state model
-        continue;
-      }
-
-      String resourceName = resource.getResourceName();
-
-      for (Partition partition : resource.getPartitions()) {
-        String currentTopStateInstance =
-            findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
-        String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
-
-        if (currentTopStateInstance != null) {
-          reportTopStateExistance(cache, currentStateOutput, stateModelDef, resourceName, partition,
-              lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
-              durationThreshold, lastPipelineFinishTimestamp);
-          updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
-        } else {
-          reportTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
-              partition, stateModelDef.getTopState(), currentStateOutput);
-          reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
-              clusterStatusMonitor);
-        }
-      }
-    }
-
-    if (clusterStatusMonitor != null) {
-      clusterStatusMonitor.resetMaxMissingTopStateGauge();
-    }
-  }
-
-  /**
-   * From current state output, find out the location of the top state of given resource
-   * and partition
-   *
-   * @param currentStateOutput current state output
-   * @param resourceName resource name
-   * @param partition partition of the resource
-   * @param stateModelDef state model def object
-   * @return name of the node that contains top state, null if there is not top state recorded
-   */
-  private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
-      String resourceName, Partition partition, StateModelDefinition stateModelDef) {
-    Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
-    for (String instance : stateMap.keySet()) {
-      if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
-        return instance;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Find cached top state location of the given resource and partition
-   *
-   * @param cache cluster data cache object
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @return cached name of the node that contains top state, null if not previously cached
-   */
-  private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
-    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-    return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
-        .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
-        .get(partition.getPartitionName()) : null;
-  }
-
-  /**
-   * Update top state location cache of the given resource and partition
-   *
-   * @param cache cluster data cache object
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param currentTopStateInstance name of the instance that currently has the top state
-   */
-  private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
-      Partition partition, String currentTopStateInstance) {
-    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-    if (!lastTopStateMap.containsKey(resourceName)) {
-      lastTopStateMap.put(resourceName, new HashMap<String, String>());
-    }
-    lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
-  }
-
-  /**
-   * When we observe a top state of a given resource and partition, we need to report for the
-   * following 2 scenarios:
-   *  1. This is a top state come back, i.e. we have a previously missing top state record
-   *  2. Top state location change, i.e. current top state location is different from what
-   *     we saw previously
-   *
-   * @param cache cluster data cache
-   * @param currentStateOutput generated after computiting current state
-   * @param stateModelDef State model definition object of the given resource
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param lastTopStateInstance our cached top state location
-   * @param currentTopStateInstance top state location we observed during this pipeline run
-   * @param clusterStatusMonitor monitor object
-   * @param durationThreshold top state handoff duration threshold
-   * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
-   */
-  private void reportTopStateExistance(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
-      StateModelDefinition stateModelDef, String resourceName, Partition partition,
-      String lastTopStateInstance, String currentTopStateInstance,
-      ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
-      long lastPipelineFinishTimestamp) {
-
-    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-
-    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
-        .containsKey(partition.getPartitionName())) {
-      // We previously recorded a top state missing, and it's not coming back
-      reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
-          missingTopStateMap, resourceName, partition, clusterStatusMonitor, durationThreshold,
-          stateModelDef.getTopState());
-    } else if (lastTopStateInstance != null && !lastTopStateInstance
-        .equals(currentTopStateInstance)) {
-      // With no missing top state record, but top state instance changed,
-      // we observed an entire top state handoff process
-      reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
-          resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
-    } else {
-      // else, there is not top state change, or top state first came up, do nothing
-      LogUtil.logDebug(LOG, _eventId, String.format(
-          "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
-          partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
-    }
-  }
-
-  /**
-   * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
-   * i.e. current top state instance loaded from ZK is different than the one we cached during
-   * last pipeline run.
-   *
-   * @param cache ClusterDataCache
-   * @param lastTopStateInstance Name of last top state instance we cached
-   * @param curTopStateInstance Name of current top state instance we refreshed from ZK
-   * @param resourceName resource name
-   * @param partition partition object
-   * @param clusterStatusMonitor cluster state monitor object
-   * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
-   */
-  private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
-      String curTopStateInstance, String resourceName, Partition partition,
-      ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
-    if (curTopStateInstance.equals(lastTopStateInstance)) {
-      return;
-    }
-
-    // Current state output generation logic guarantees that current top state instance
-    // must be a live instance
-    String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
-    long endTime =
-        cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
-            .getEndTime(partition.getPartitionName());
-
-    long startTime = NOT_RECORDED;
-    if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
-      String lastTopStateSession =
-          cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
-      // Make sure last top state instance has not bounced during cluster data cache refresh
-      // We need this null check as there are test cases creating incomplete current state
-      if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
-          != null) {
-        startTime =
-            cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
-                .getStartTime(partition.getPartitionName());
-      }
-    }
-    if (startTime == NOT_RECORDED) {
-      // either cached last top state instance is no longer alive, or it bounced during cluster
-      // data cache refresh, we use last pipeline run end time for best guess. Though we can
-      // calculate this number in a more precise way by refreshing data from ZK, given the rarity
-      // of this corner case, it's not worthy.
-      startTime = lastPipelineFinishTimestamp;
-    }
-
-    if (startTime == NOT_RECORDED || startTime > endTime) {
-      // Top state handoff finished before end of last pipeline run, and instance contains
-      // previous top state is no longer alive, so our best guess did not work, ignore the
-      // data point for now.
-      LogUtil.logWarn(LOG, _eventId, String
-          .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
-              partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
-      return;
-    }
-
-    LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
-        endTime - startTime, partition.getPartitionName()));
-    if (clusterStatusMonitor != null) {
-      clusterStatusMonitor
-          .updateMissingTopStateDurationStats(resourceName, endTime - startTime,
-              true);
-    }
-  }
-
-  /**
-   * Check if the given partition of the given resource has a missing top state duration larger
-   * than the threshold, if so, report a top state transition failure
-   *
-   * @param cache cluster data cache
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param durationThreshold top state handoff duration threshold
-   * @param clusterStatusMonitor monitor object
-   */
-  private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
-      Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
-    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-    String partitionName = partition.getPartitionName();
-    Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
-    if (startTime != null && startTime > 0
-        && System.currentTimeMillis() - startTime > durationThreshold) {
-      missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
-      if (clusterStatusMonitor != null) {
-        clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
-      }
-    }
-  }
-
-  /**
-   * When we find a top state missing of the given partition, we find out when it started to miss
-   * top state, then we record it in cache
-   *
-   * @param cache cluster data cache
-   * @param missingTopStateMap missing top state record
-   * @param lastTopStateMap our cached last top state locations
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param topState top state name
-   * @param currentStateOutput current state output
-   */
-  private void reportTopStateMissing(ClusterDataCache cache,
-      Map<String, Map<String, Long>> missingTopStateMap,
-      Map<String, Map<String, String>> lastTopStateMap, String resourceName, Partition partition,
-      String topState, CurrentStateOutput currentStateOutput) {
-    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
-        .containsKey(partition.getPartitionName())) {
-      // a previous missing has been already recorded
-      return;
-    }
-
-    long startTime = NOT_RECORDED;
-
-    // 1. try to find the previous topstate missing event for the startTime.
-    String missingStateInstance = null;
-    if (lastTopStateMap.containsKey(resourceName)) {
-      missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
-    }
-
-    if (missingStateInstance != null) {
-      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-      if (liveInstances.containsKey(missingStateInstance)) {
-        CurrentState currentState = cache.getCurrentState(missingStateInstance,
-            liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
-
-        if (currentState != null
-            && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
-            .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
-          // Update the latest start time only from top state to other state transition
-          // At beginning, the start time should -1 (not recorded). If something happen either
-          // instance not alive or the instance just started for that partition, Helix does not know
-          // the previous start time or end time. So we count from current.
-          //
-          // Previous state is top state does not mean that resource has only one top state
-          // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
-          startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
-        } // Else no related state transition history found, use current time as the missing start time.
-      } else {
-        // If the previous topState holder is no longer alive, the offline time is used as start time.
-        Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
-        if (offlineMap.containsKey(missingStateInstance)) {
-          startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
-        }
-      }
-    }
-
-    // 2. if no previous topstate records, use any pending message that are created for topstate transition
-    if (startTime == NOT_RECORDED) {
-      for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
-          .values()) {
-        // Only messages that match the current session ID will be recorded in the map.
-        // So no need to redundantly check here.
-        if (message.getToState().equals(topState)) {
-          startTime = Math.max(startTime, message.getCreateTimeStamp());
-        }
-      }
-    }
-
-    // 3. if no clue about previous topstate or any related pending message, use the current system time.
-    if (startTime == NOT_RECORDED) {
-      LogUtil.logWarn(LOG, _eventId,
-          "Cannot confirm top state missing start time. Use the current system time as the start time.");
-      startTime = System.currentTimeMillis();
-    }
-
-    if (!missingTopStateMap.containsKey(resourceName)) {
-      missingTopStateMap.put(resourceName, new HashMap<String, Long>());
-    }
-
-    Map<String, Long> partitionMap = missingTopStateMap.get(resourceName);
-    // Update the new partition without top state
-    if (!partitionMap.containsKey(partition.getPartitionName())) {
-      partitionMap.put(partition.getPartitionName(), startTime);
-    }
-  }
-
-  /**
-   * When we see a top state come back, i.e. we observe a top state in this pipeline run,
-   * but have a top state missing record before, we need to remove the top state missing
-   * record and report top state handoff duration
-   *
-   * @param cache cluster data cache
-   * @param stateMap state map of the given partition of the given resource
-   * @param missingTopStateMap missing top state record
-   * @param resourceName resource name
-   * @param partition partition of the resource
-   * @param clusterStatusMonitor monitor object
-   * @param threshold top state handoff threshold
-   * @param topState name of the top state
-   */
-  private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap,
-      Map<String, Map<String, Long>> missingTopStateMap, String resourceName, Partition partition,
-      ClusterStatusMonitor clusterStatusMonitor, long threshold, String topState) {
-
-    long handOffStartTime = missingTopStateMap.get(resourceName).get(partition.getPartitionName());
-
-    // Find the earliest end time from the top states
-    long handOffEndTime = System.currentTimeMillis();
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    for (String instanceName : stateMap.keySet()) {
-      CurrentState currentState =
-          cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
-              .get(resourceName);
-      if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
-        handOffEndTime =
-            Math.min(handOffEndTime, currentState.getEndTime(partition.getPartitionName()));
-      }
-    }
-
-    if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
-      LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
-          handOffEndTime - handOffStartTime, partition.getPartitionName()));
-      if (clusterStatusMonitor != null) {
-        clusterStatusMonitor
-            .updateMissingTopStateDurationStats(resourceName, handOffEndTime - handOffStartTime,
-                true);
-      }
-    }
-    removeFromStatsMap(missingTopStateMap, resourceName, partition);
-  }
-
-  private void removeFromStatsMap(Map<String, Map<String, Long>> missingTopStateMap,
-      String resourceName, Partition partition) {
-    if (missingTopStateMap.containsKey(resourceName)) {
-      missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
-    }
-
-    if (missingTopStateMap.get(resourceName).size() == 0) {
-      missingTopStateMap.remove(resourceName);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
new file mode 100644
index 0000000..3aaf326
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
@@ -0,0 +1,58 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A record entry in cluster data cache containing information about a partition's
+ * missing top state
+ */
+public class MissingTopStateRecord {
+  private final boolean isGracefulHandoff;
+  private final long startTimeStamp;
+  private final long userLatency;
+  private boolean failed;
+
+  public MissingTopStateRecord(long start, long user, boolean graceful) {
+    isGracefulHandoff = graceful;
+    startTimeStamp = start;
+    userLatency = user;
+    failed = false;
+  }
+
+  /* package */ boolean isGracefulHandoff() {
+    return isGracefulHandoff;
+  }
+
+  /* package */ long getStartTimeStamp() {
+    return startTimeStamp;
+  }
+
+  /* package */ long getUserLatency() {
+    return userLatency;
+  }
+
+  /* package */ void setFailed() {
+    failed = true;
+  }
+
+  /* package */ boolean isFailed() {
+    return failed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 247d053..4f417fd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -24,6 +24,14 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   public void execute(ClusterEvent event) {
     ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+
+    if (clusterDataCache == null || manager == null) {
+      LOG.warn(
+          "ClusterDataCache or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          event.getEventId(), event.getEventType(), event.getClusterName());
+      return;
+    }
+
     Set<WorkflowConfig> existingWorkflows =
         new HashSet<>(clusterDataCache.getWorkflowConfigMap().values());
     for (WorkflowConfig workflowConfig : existingWorkflows) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
new file mode 100644
index 0000000..699f2a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
@@ -0,0 +1,506 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Observe top state handoff and report latency
+ * TODO: make this stage async
+ */
+public class TopStateHandoffReportStage extends AbstractBaseStage {
+  private static final long DEFAULT_HANDOFF_USER_LATENCY = 0L;
+  private static Logger LOG = LoggerFactory.getLogger(TopStateHandoffReportStage.class);
+  public static final long TIMESTAMP_NOT_RECORDED = -1L;
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+    final ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    final Long lastPipelineFinishTimestamp = event
+        .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
+            TIMESTAMP_NOT_RECORDED);
+    final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+    final CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+    final ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+
+    if (cache == null || resourceMap == null || currentStateOutput == null) {
+      throw new StageException(
+          "Missing critical attributes for stage, requires ClusterDataCache, RESOURCES and CURRENT_STATE");
+    }
+
+    if (cache.isTaskCache()) {
+      throw new StageException("TopStateHandoffReportStage can only be used in resource pipeline");
+    }
+
+    updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
+        lastPipelineFinishTimestamp);
+
+  }
+
+  private void updateTopStateStatus(ClusterDataCache cache,
+      ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
+      CurrentStateOutput currentStateOutput,
+      long lastPipelineFinishTimestamp) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+
+    long durationThreshold = Long.MAX_VALUE;
+    if (cache.getClusterConfig() != null) {
+      durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
+    }
+
+    // Remove any resource records that no longer exists
+    missingTopStateMap.keySet().retainAll(resourceMap.keySet());
+    lastTopStateMap.keySet().retainAll(resourceMap.keySet());
+
+    for (Resource resource : resourceMap.values()) {
+      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+      if (stateModelDef == null) {
+        // Resource does not have valid state model, just skip processing
+        continue;
+      }
+
+      String resourceName = resource.getResourceName();
+
+      for (Partition partition : resource.getPartitions()) {
+        String currentTopStateInstance =
+            findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
+        String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
+
+        if (currentTopStateInstance != null) {
+          reportTopStateExistence(cache, currentStateOutput, stateModelDef, resourceName, partition,
+              lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
+              durationThreshold, lastPipelineFinishTimestamp);
+          updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
+        } else {
+          reportTopStateMissing(cache, resourceName,
+              partition, stateModelDef.getTopState(), currentStateOutput);
+          reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
+              clusterStatusMonitor);
+        }
+      }
+    }
+
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.resetMaxMissingTopStateGauge();
+    }
+  }
+
+  /**
+   * From current state output, find out the location of the top state of given resource
+   * and partition
+   *
+   * @param currentStateOutput current state output
+   * @param resourceName resource name
+   * @param partition partition of the resource
+   * @param stateModelDef state model def object
+   * @return name of the node that contains top state, null if there is not top state recorded
+   */
+  private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
+      String resourceName, Partition partition, StateModelDefinition stateModelDef) {
+    Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
+    for (String instance : stateMap.keySet()) {
+      if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
+        return instance;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Find cached top state location of the given resource and partition
+   *
+   * @param cache cluster data cache object
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @return cached name of the node that contains top state, null if not previously cached
+   */
+  private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
+        .get(partition.getPartitionName()) : null;
+  }
+
+  /**
+   * Update top state location cache of the given resource and partition
+   *
+   * @param cache cluster data cache object
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param currentTopStateInstance name of the instance that currently has the top state
+   */
+  private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
+      Partition partition, String currentTopStateInstance) {
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    if (!lastTopStateMap.containsKey(resourceName)) {
+      lastTopStateMap.put(resourceName, new HashMap<String, String>());
+    }
+    lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
+  }
+
+  /**
+   * When we observe a top state of a given resource and partition, we need to report for the
+   * following 2 scenarios:
+   *  1. This is a top state come back, i.e. we have a previously missing top state record
+   *  2. Top state location change, i.e. current top state location is different from what
+   *     we saw previously
+   *
+   * @param cache cluster data cache
+   * @param currentStateOutput generated after computing current state
+   * @param stateModelDef State model definition object of the given resource
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param lastTopStateInstance our cached top state location
+   * @param currentTopStateInstance top state location we observed during this pipeline run
+   * @param clusterStatusMonitor monitor object
+   * @param durationThreshold top state handoff duration threshold
+   * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
+   */
+  private void reportTopStateExistence(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
+      StateModelDefinition stateModelDef, String resourceName, Partition partition,
+      String lastTopStateInstance, String currentTopStateInstance,
+      ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
+      long lastPipelineFinishTimestamp) {
+
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+
+    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName())) {
+      // We previously recorded a top state missing, and it's not coming back
+      reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
+          resourceName, partition, clusterStatusMonitor, durationThreshold,
+          stateModelDef.getTopState());
+    } else if (lastTopStateInstance != null && !lastTopStateInstance
+        .equals(currentTopStateInstance)) {
+      // With no missing top state record, but top state instance changed,
+      // we observed an entire top state handoff process
+      reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
+          resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
+    } else {
+      // else, there is not top state change, or top state first came up, do nothing
+      LogUtil.logDebug(LOG, _eventId, String.format(
+          "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
+          partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
+    }
+  }
+
+  /**
+   * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
+   * i.e. current top state instance loaded from ZK is different than the one we cached during
+   * last pipeline run.
+   *
+   * @param cache ClusterDataCache
+   * @param lastTopStateInstance Name of last top state instance we cached
+   * @param curTopStateInstance Name of current top state instance we refreshed from ZK
+   * @param resourceName resource name
+   * @param partition partition object
+   * @param clusterStatusMonitor cluster state monitor object
+   * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
+   */
+  private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
+      String curTopStateInstance, String resourceName, Partition partition,
+      ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
+    if (curTopStateInstance.equals(lastTopStateInstance)) {
+      return;
+    }
+
+    // Current state output generation logic guarantees that current top state instance
+    // must be a live instance
+    String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
+    long endTime =
+        cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+            .getEndTime(partition.getPartitionName());
+    long toTopStateuserLatency =
+        endTime - cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+            .getStartTime(partition.getPartitionName());
+
+    long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+    long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+
+    // Make sure last top state instance has not bounced during cluster data cache refresh
+    if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
+      String lastTopStateSession =
+          cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
+      // We need this null check as there are test cases creating incomplete current state
+      if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+          != null) {
+        startTime =
+            cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+                .getStartTime(partition.getPartitionName());
+        fromTopStateUserLatency =
+            cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+                .getEndTime(partition.getPartitionName()) - startTime;
+      }
+    }
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+      // either cached last top state instance is no longer alive, or it bounced during cluster
+      // data cache refresh, we use last pipeline run end time for best guess. Though we can
+      // calculate this number in a more precise way by refreshing data from ZK, given the rarity
+      // of this corner case, it's not worthy.
+      startTime = lastPipelineFinishTimestamp;
+      fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+    }
+
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED || startTime > endTime) {
+      // Top state handoff finished before end of last pipeline run, and instance contains
+      // previous top state is no longer alive, so our best guess did not work, ignore the
+      // data point for now.
+      LogUtil.logWarn(LOG, _eventId, String
+          .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
+              partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
+      return;
+    }
+
+    long duration = endTime - startTime;
+    long userLatency = fromTopStateUserLatency + toTopStateuserLatency;
+    // We always treat such top state handoff as graceful as if top state handoff is triggered
+    // by instance crash, we cannot observe the entire handoff process within 1 pipeline run
+    logMissingTopStateInfo(duration, userLatency, true, partition.getPartitionName());
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor
+          .updateMissingTopStateDurationStats(resourceName, duration, userLatency, true, true);
+    }
+  }
+
+  /**
+   * Check if the given partition of the given resource has a missing top state duration larger
+   * than the threshold, if so, report a top state transition failure
+   *
+   * @param cache cluster data cache
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param durationThreshold top state handoff duration threshold
+   * @param clusterStatusMonitor monitor object
+   */
+  private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
+      Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+    String partitionName = partition.getPartitionName();
+    MissingTopStateRecord record = missingTopStateMap.get(resourceName).get(partitionName);
+    long startTime = record.getStartTimeStamp();
+    if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold && !record
+        .isFailed()) {
+      record.setFailed();
+      missingTopStateMap.get(resourceName).put(partitionName, record);
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 0L, false, false);
+      }
+    }
+  }
+
+  /**
+   * When we find a top state missing of the given partition, we find out when it started to miss
+   * top state, then we record it in cache
+   *
+   * @param cache cluster data cache
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param topState top state name
+   * @param currentStateOutput current state output
+   */
+  private void reportTopStateMissing(ClusterDataCache cache, String resourceName, Partition partition,
+      String topState, CurrentStateOutput currentStateOutput) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap = cache.getMissingTopStateMap();
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName())) {
+      // a previous missing has been already recorded
+      return;
+    }
+
+    long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+    long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+    boolean isGraceful = true;
+
+    // 1. try to find the previous topstate missing event for the startTime.
+    String missingStateInstance = null;
+    if (lastTopStateMap.containsKey(resourceName)) {
+      missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
+    }
+
+    if (missingStateInstance != null) {
+      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+      if (liveInstances.containsKey(missingStateInstance)) {
+        CurrentState currentState = cache.getCurrentState(missingStateInstance,
+            liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
+
+        if (currentState != null
+            && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
+            .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+
+          // Update the latest start time only from top state to other state transition
+          // At beginning, the start time should -1 (not recorded). If something happen either
+          // instance not alive or the instance just started for that partition, Helix does not know
+          // the previous start time or end time. So we count from current.
+          //
+          // Previous state is top state does not mean that resource has only one top state
+          // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
+          long fromTopStateStartTime = currentState.getStartTime(partition.getPartitionName());
+          if (fromTopStateStartTime > startTime) {
+            startTime = fromTopStateStartTime;
+            fromTopStateUserLatency =
+                currentState.getEndTime(partition.getPartitionName()) - startTime;
+          }
+          startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
+        } // Else no related state transition history found, use current time as the missing start time.
+      } else {
+        // If the previous topState holder is no longer alive, the offline time is used as start time.
+        // Also, if we observe a top state missing and the previous top state node is gone, the
+        // top state handoff is not graceful
+        isGraceful = false;
+        Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
+        if (offlineMap.containsKey(missingStateInstance)) {
+          startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
+        }
+      }
+    }
+
+    // 2. if no previous top state records, it's either resource just created or there is a
+    // controller leadership change. Check any pending message that are created for top state
+    // transition. Assume this is graceful top state handoff as if the from top state instance
+    // crashed, we are not recording such message
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+      for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
+          .values()) {
+        // Only messages that match the current session ID will be recorded in the map.
+        // So no need to redundantly check here.
+        if (message.getToState().equals(topState)) {
+          startTime = Math.max(startTime, message.getCreateTimeStamp());
+        }
+      }
+    }
+
+    // 3. if no clue about previous top state or any related pending message, it could be
+    //    a. resource just created
+    //    b. controller leader switch (actual hand off could be either graceful or non graceful)
+    //
+    // Use the current system time as missing top state start time and assume always graceful
+    // TODO: revise this case and see if this case can be better addressed
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+      LogUtil.logWarn(LOG, _eventId,
+          "Cannot confirm top state missing start time. Use the current system time as the start time.");
+      startTime = System.currentTimeMillis();
+    }
+
+    if (!missingTopStateMap.containsKey(resourceName)) {
+      missingTopStateMap.put(resourceName, new HashMap<String, MissingTopStateRecord>());
+    }
+
+    missingTopStateMap.get(resourceName).put(partition.getPartitionName(),
+        new MissingTopStateRecord(startTime, fromTopStateUserLatency, isGraceful));
+  }
+
+  /**
+   * When we see a top state come back, i.e. we observe a top state in this pipeline run,
+   * but have a top state missing record before, we need to remove the top state missing
+   * record and report top state handoff duration
+   *
+   * @param cache cluster data cache
+   * @param stateMap state map of the given partition of the given resource
+   * @param resourceName resource name
+   * @param partition partition of the resource
+   * @param clusterStatusMonitor monitor object
+   * @param threshold top state handoff threshold
+   * @param topState name of the top state
+   */
+  private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap, String resourceName,
+      Partition partition, ClusterStatusMonitor clusterStatusMonitor, long threshold,
+      String topState) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+    MissingTopStateRecord record =
+        missingTopStateMap.get(resourceName).get(partition.getPartitionName());
+    long handOffStartTime = record.getStartTimeStamp();
+    long fromTopStateUserLatency = record.getUserLatency();
+
+    // Find the earliest end time from the top states and the corresponding user latency
+    long handOffEndTime = Long.MAX_VALUE;
+    long toTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+    for (String instanceName : stateMap.keySet()) {
+      CurrentState currentState =
+          cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
+              .get(resourceName);
+      if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+        if (currentState.getEndTime(partition.getPartitionName()) <= handOffEndTime) {
+          handOffEndTime = currentState.getEndTime(partition.getPartitionName());
+          toTopStateUserLatency =
+              handOffEndTime - currentState.getStartTime(partition.getPartitionName());
+        }
+      }
+    }
+
+    if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
+      long duration = handOffEndTime - handOffStartTime;
+      long userLatency = fromTopStateUserLatency + toTopStateUserLatency;
+      // It is possible that during controller leader switch, we lost previous master information
+      // and use current time to approximate missing top state start time. If we see the actual
+      // user latency is larger than the duration we estimated, we use user latency to start with
+      duration = Math.max(duration, userLatency);
+      boolean isGraceful = record.isGracefulHandoff();
+      logMissingTopStateInfo(duration, userLatency, isGraceful, partition.getPartitionName());
+
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor
+            .updateMissingTopStateDurationStats(resourceName, duration, userLatency, isGraceful,
+                true);
+      }
+    }
+    removeFromStatsMap(missingTopStateMap, resourceName, partition);
+  }
+
+  private void removeFromStatsMap(
+      Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap, String resourceName,
+      Partition partition) {
+    if (missingTopStateMap.containsKey(resourceName)) {
+      missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
+    }
+
+    if (missingTopStateMap.get(resourceName).size() == 0) {
+      missingTopStateMap.remove(resourceName);
+    }
+  }
+
+  private void logMissingTopStateInfo(long totalDuration, long userLatency, boolean isGraceful,
+      String partitionName) {
+    LogUtil.logInfo(LOG, _eventId, String
+        .format("Missing top state duration is %s/%s for partition %s. Graceful: %s", userLatency,
+            totalDuration, partitionName, isGraceful));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index e3655d8..f870ddc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -22,6 +22,21 @@ package org.apache.helix.monitoring.mbeans;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -38,22 +53,6 @@ import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusMonitor.class);
@@ -446,11 +445,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public synchronized void updateMissingTopStateDurationStats(String resourceName, long duration, boolean succeeded) {
+  public synchronized void updateMissingTopStateDurationStats(String resourceName,
+      long totalDuration, long userLatency, boolean isGraceful, boolean succeeded) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, duration, succeeded);
+      resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, totalDuration,
+          userLatency, isGraceful, succeeded);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index a103a0c..c3dd242 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -21,7 +21,15 @@ package org.apache.helix.monitoring.mbeans;
 
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.ObjectName;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -31,10 +39,6 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
-import javax.management.JMException;
-import javax.management.ObjectName;
-import java.util.*;
-
 public class ResourceMonitor extends DynamicMBeanProvider {
 
   // Gauges
@@ -60,6 +64,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   // Histograms
   private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+  private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge;
+  private HistogramDynamicMetric _partitionTopStateNonGracefulHandoffDurationGauge;
 
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
@@ -86,6 +92,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_failedTopStateHandoffCounter);
     attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
     attributeList.add(_partitionTopStateHandoffDurationGauge);
+    attributeList.add(_partitionTopStateHandoffUserLatencyGauge);
+    attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
     attributeList.add(_numPendingStateTransitions);
     doRegister(attributeList, _initObjectName);
@@ -96,6 +104,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     TOP_STATE
   }
 
+  @SuppressWarnings("unchecked")
   public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
     _clusterName = clusterName;
     _resourceName = resourceName;
@@ -122,6 +131,14 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _partitionTopStateHandoffDurationGauge =
         new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
             new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
+    _partitionTopStateHandoffUserLatencyGauge =
+        new HistogramDynamicMetric("PartitionTopStateHandoffUserLatencyGauge", new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _partitionTopStateNonGracefulHandoffDurationGauge =
+        new HistogramDynamicMetric("PartitionTopStateNonGracefulHandoffGauge", new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
     _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0L);
     _maxSinglePartitionTopStateHandoffDuration =
         new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L);
@@ -165,6 +182,18 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _maxSinglePartitionTopStateHandoffDuration.getValue();
   }
 
+  public HistogramDynamicMetric getPartitionTopStateHandoffDurationGauge() {
+    return _partitionTopStateHandoffDurationGauge;
+  }
+
+  public HistogramDynamicMetric getPartitionTopStateNonGracefulHandoffDurationGauge() {
+    return _partitionTopStateNonGracefulHandoffDurationGauge;
+  }
+
+  public HistogramDynamicMetric getPartitionTopStateHandoffUserLatencyGauge() {
+    return _partitionTopStateHandoffUserLatencyGauge;
+  }
+
   public long getFailedTopStateHandoffCounter() {
     return _failedTopStateHandoffCounter.getValue();
   }
@@ -310,25 +339,31 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numPendingStateTransitions.updateValue((long) messageCount);
   }
 
-  public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) {
+  public void updateStateHandoffStats(MonitorState monitorState, long totalDuration,
+      long userLatency, boolean isGraceful, boolean succeeded) {
     switch (monitorState) {
-      case TOP_STATE:
-        if (succeeded) {
-          _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
-          _successfulTopStateHandoffDurationCounter
-              .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration);
-          _partitionTopStateHandoffDurationGauge.updateValue(duration);
-          if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
-            _maxSinglePartitionTopStateHandoffDuration.updateValue(duration);
-            _lastResetTime = System.currentTimeMillis();
-          }
+    case TOP_STATE:
+      if (succeeded) {
+        _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
+        _successfulTopStateHandoffDurationCounter
+            .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + totalDuration);
+        if (isGraceful) {
+          _partitionTopStateHandoffDurationGauge.updateValue(totalDuration);
+          _partitionTopStateHandoffUserLatencyGauge.updateValue(userLatency);
         } else {
-          _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+          _partitionTopStateNonGracefulHandoffDurationGauge.updateValue(totalDuration);
         }
-        break;
-      default:
-        _logger.warn(
-            String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
+        if (totalDuration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
+          _maxSinglePartitionTopStateHandoffDuration.updateValue(totalDuration);
+          _lastResetTime = System.currentTimeMillis();
+        }
+      } else {
+        _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+      }
+      break;
+    default:
+      _logger.warn(
+          String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
     }
   }
 
@@ -379,4 +414,4 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       _lastResetTime = System.currentTimeMillis();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0225f83..ea529e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -861,7 +861,7 @@ public class TaskDriver {
     if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
       throw new HelixException(String.format(
           "Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
-          workflowName, targetStates.toString(),
+          workflowName, Arrays.asList(targetStates),
           ctx == null ? "null" : ctx.getWorkflowState().toString()));
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 7495078..5509858 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -173,42 +173,56 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
 
   @Test
   public void testReassignment() throws Exception {
-    final int NUM_INSTANCES = 5;
-    String jobName = TestHelper.getTestMethodName();
-    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    String workflowName = TestHelper.getTestMethodName();
+    String jobNameSuffix = "job";
+    String jobName = String.format("%s_%s", workflowName, jobNameSuffix);
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+
+    // This is error prone as ThreadCountBasedTaskAssigner will always be re-assign
+    // task to same instance given we only have 1 task to assign and the order or
+    // iterating all nodes during assignment is always the same. Rarely some change
+    // will alter the order of iteration debug assignment so we need to change
+    // this instance name to keep on testing this functionality.
+    final String failInstance = "localhost_12919";
     Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true,
-        "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
+        "failInstance", failInstance));
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
     jobCommandMap.put("Timeout", "1000");
 
-    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
-        .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJob(jobName, jobBuilder);
+    // Retry forever
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(jobCommandMap).setMaxAttemptsPerTask(Integer.MAX_VALUE);
+    workflowBuilder.addJob(jobNameSuffix, jobBuilder);
 
     _driver.start(workflowBuilder.build());
 
-    // Ensure the job completes
-    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
-    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
+    // Poll to ensure that the gets re-attempted first
+    int trial = 0;
+    while (trial < 1000) { // 100 sec
+      JobContext jctx = _driver.getJobContext(jobName);
+      if (jctx != null && jctx.getPartitionNumAttempts(0) > 1) {
+        break;
+      }
+      Thread.sleep(100);
+      trial += 1;
+    }
+
+    if (trial == 1000) {
+      Assert.fail("Job " + jobName + " is not retried");
+    }
+
+    // disable failed instance
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, false);
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
-
-    // Ensure that this was tried on two different instances, the first of which exhausted the
-    // attempts number, and the other passes on the first try -> See below
-
-    // TEST FIX: After quota-based scheduling support, we use a different assignment strategy (not
-    // consistent hashing), which does not necessarily guarantee that failed tasks will be assigned
-    // on a different instance. The parameters for this test are adjusted accordingly
-    // Also, hard-coding the instance name (line 184) is not a reliable way of testing whether
-    // re-assignment took place, so this test is no longer valid and will always pass
-    Assert.assertEquals(_runCounts.size(), 1);
-    // Assert.assertTrue(
-    // _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
-    Assert.assertTrue(_runCounts.values().contains(1));
+    Assert.assertNotSame(_driver.getJobContext(jobName).getAssignedParticipant(0), failInstance);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, true);
   }
 
   @Test