You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 23:08:12 UTC

[1/2] helix git commit: [HELIX-771] More detailed top state handoff metrics

Repository: helix
Updated Branches:
  refs/heads/master 9d7364d7a -> 7e49f995e


http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index fac957c..c1cfce0 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -21,37 +21,118 @@ package org.apache.helix.monitoring.mbeans;
 
 import com.google.common.collect.Range;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.TopStateHandoffReportStage;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Resource;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
 import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class TestTopStateHandoffMetrics extends BaseStageTest {
   public final static String TEST_INPUT_FILE = "TestTopStateHandoffMetrics.json";
-  public final static String INITIAL_CURRENT_STATES = "initialCurrentStates";
-  public final static String MISSING_TOP_STATES = "MissingTopStates";
-  public final static String HANDOFF_CURRENT_STATES = "handoffCurrentStates";
-  public final static String EXPECTED_DURATION = "expectedDuration";
   public final static String TEST_RESOURCE = "TestResource";
   public final static String PARTITION = "PARTITION";
 
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final String NON_GRACEFUL_HANDOFF_DURATION = "PartitionTopStateNonGracefulHandoffGauge.Max";
+  private static final String GRACEFUL_HANDOFF_DURATION = "PartitionTopStateHandoffDurationGauge.Max";
+  private static final String HANDOFF_USER_LATENCY = "PartitionTopStateHandoffUserLatencyGauge.Max";
+  private static final Range<Long> DURATION_ZERO = Range.closed(0L, 0L);
+  private TestConfig config;
+
+  @BeforeClass
+  public void beforeClass() {
+    super.beforeClass();
+    try {
+      config = OBJECT_MAPPER
+          .readValue(getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE), TestConfig.class);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private static class CurrentStateInfo {
+    String currentState;
+    String previousState;
+    long startTime;
+    long endTime;
+
+    @JsonCreator
+    public CurrentStateInfo(
+        @JsonProperty("CurrentState") String cs,
+        @JsonProperty("PreviousState") String ps,
+        @JsonProperty("StartTime") long start,
+        @JsonProperty("EndTime") long end
+    ) {
+      currentState = cs;
+      previousState = ps;
+      startTime = start;
+      endTime = end;
+    }
+  }
+
+  private static class TestCaseConfig {
+    final Map<String, CurrentStateInfo> initialCurrentStates;
+    final Map<String, CurrentStateInfo> currentStateWithMissingTopState;
+    final Map<String, CurrentStateInfo> finalCurrentState;
+    final long duration;
+    final boolean isGraceful;
+    final long userLatency;
+
+    @JsonCreator
+    public TestCaseConfig(
+        @JsonProperty("InitialCurrentStates") Map<String, CurrentStateInfo> initial,
+        @JsonProperty("MissingTopStates") Map<String, CurrentStateInfo> missing,
+        @JsonProperty("HandoffCurrentStates") Map<String, CurrentStateInfo> handoff,
+        @JsonProperty("Duration") long d,
+        @JsonProperty("UserLatency") long user,
+        @JsonProperty("IsGraceful") boolean graceful
+    ) {
+      initialCurrentStates = initial;
+      currentStateWithMissingTopState = missing;
+      finalCurrentState = handoff;
+      duration = d;
+      userLatency = user;
+      isGraceful = graceful;
+    }
+  }
+
+  private static class TestConfig {
+    final List<TestCaseConfig> succeeded;
+    final List<TestCaseConfig> failed;
+    final List<TestCaseConfig> fast;
+    final List<TestCaseConfig> succeededNonGraceful;
+
+    @JsonCreator
+    public TestConfig(
+        @JsonProperty("succeeded") List<TestCaseConfig> succeededCfg,
+        @JsonProperty("failed") List<TestCaseConfig> failedCfg,
+        @JsonProperty("fast") List<TestCaseConfig> fastCfg,
+        @JsonProperty("succeededNonGraceful") List<TestCaseConfig> nonGraceful
+    ) {
+      succeeded = succeededCfg;
+      failed = failedCfg;
+      fast = fastCfg;
+      succeededNonGraceful = nonGraceful;
+    }
+  }
+
   private void preSetup() {
     setupLiveInstances(3);
     setupStateModel();
@@ -61,165 +142,159 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     event.addAttribute(AttributeName.RESOURCES.name(),
         Collections.singletonMap(TEST_RESOURCE, resource));
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
-        CurrentStateComputationStage.NOT_RECORDED);
+        TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED);
     ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
     monitor.active();
     event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
   }
 
   @Test(dataProvider = "successCurrentStateInput")
-  public void testTopStateSuccessHandoff(Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
-    preSetup();
-    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
-        Range.closed(expectedDuration, expectedDuration),
-        Range.closed(expectedDuration, expectedDuration));
+  public void testTopStateSuccessHandoff(TestCaseConfig cfg) {
+    runTestWithNoInjection(cfg, false);
   }
 
   @Test(dataProvider = "fastCurrentStateInput")
-  public void testFastTopStateHandoffWithNoMissingTopState(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
-  ) {
-    preSetup();
-    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
-        Range.closed(expectedDuration, expectedDuration),
-        Range.closed(expectedDuration, expectedDuration));
+  public void testFastTopStateHandoffWithNoMissingTopState(TestCaseConfig cfg) {
+    runTestWithNoInjection(cfg, false);
   }
 
   @Test(dataProvider = "fastCurrentStateInput")
-  public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
-  ) {
+  public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(TestCaseConfig cfg) {
     preSetup();
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), 7500L);
     // By simulating last master instance crash, we now have:
     //  - M->S from 6000 to 7000
     //  - lastPipelineFinishTimestamp is 7500
     //  - S->M from 8000 to 9000
-    // Therefore the recorded latency should be 9000 - 7500 = 1500
+    // Therefore the recorded latency should be 9000 - 7500 = 1500, though original master crashed,
+    // since this is a single top state handoff observed within 1 pipeline, we treat it as graceful,
+    // and only record user latency for transiting to master
+    Range<Long> expectedDuration = Range.closed(1500L, 1500L);
+    Range<Long> expectedUserLatency = Range.closed(1000L, 1000L);
     runStageAndVerify(
-        initialCurrentStates, missingTopStates, handOffCurrentStates,
+        cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
         new MissingStatesDataCacheInject() {
           @Override
           public void doInject(ClusterDataCache cache) {
             cache.getLiveInstances().remove("localhost_1");
           }
         }, 1, 0,
-        Range.closed(1500L, 1500L),
-        Range.closed(1500L, 1500L)
+        expectedDuration,
+        DURATION_ZERO,
+        expectedDuration,
+        expectedUserLatency
     );
     event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
-        CurrentStateComputationStage.NOT_RECORDED);
+        TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED);
   }
 
-  @Test(dataProvider = "failedCurrentStateInput")
-  public void testTopStateFailedHandoff(Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+  @Test(dataProvider = "succeededNonGraceful")
+  public void testTopStateSuccessfulYetNonGracefulHandoff(TestCaseConfig cfg) {
+    // localhost_0 crashed at 15000
+    // localhost_1 slave -> master started 20000, ended 22000, top state handoff = 7000
     preSetup();
-    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
-    clusterConfig.setMissTopStateDurationThreshold(5000L);
-    setClusterConfig(clusterConfig);
-
-    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 0, 1,
-        Range.closed(expectedDuration, expectedDuration),
-        Range.closed(expectedDuration, expectedDuration));
-  }
-
-  // Test handoff that are triggered by an offline master instance
-  @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
-  public void testTopStateSuccessHandoffWithOfflineNode(
-      final Map<String, Map<String, String>> initialCurrentStates,
-      final Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
-    final long offlineTimeBeforeMasterless = 125;
-
-    preSetup();
-    long durationToVerify = expectedDuration + offlineTimeBeforeMasterless;
-    runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates,
+    final String downInstance = "localhost_0";
+    final Long lastOfflineTime = 15000L;
+    Range<Long> expectedDuration = Range.closed(7000L, 7000L);
+    runStageAndVerify(
+        cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
         new MissingStatesDataCacheInject() {
           @Override
           public void doInject(ClusterDataCache cache) {
-            Set<String> topStateNodes = new HashSet<>();
-            for (String instance : initialCurrentStates.keySet()) {
-              if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
-                topStateNodes.add(instance);
-              }
-            }
-            // Simulate the previous top state instance goes offline
-            if (!topStateNodes.isEmpty()) {
-              cache.getLiveInstances().keySet().removeAll(topStateNodes);
-              for (String topStateNode : topStateNodes) {
-                long originalStartTime =
-                    Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
-                cache.getInstanceOfflineTimeMap()
-                    .put(topStateNode, originalStartTime - offlineTimeBeforeMasterless);
-              }
-            }
+            accessor.removeProperty(accessor.keyBuilder().liveInstance(downInstance));
+            cache.getLiveInstances().remove("localhost_0");
+            cache.getInstanceOfflineTimeMap().put("localhost_0", lastOfflineTime);
+            cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
           }
-        }, 1, 0, Range.closed(durationToVerify, durationToVerify),
-        Range.closed(durationToVerify, durationToVerify));
+        }, 1, 0,
+        DURATION_ZERO, // graceful handoff duration should be 0
+        expectedDuration, // we should have an record for non-graceful handoff
+        expectedDuration, // max handoff should be same as non-graceful handoff
+        DURATION_ZERO // we don't record user latency for non-graceful transition
+    );
+  }
+
+  @Test(dataProvider = "failedCurrentStateInput")
+  public void testTopStateFailedHandoff(TestCaseConfig cfg) {
+    ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+    clusterConfig.setMissTopStateDurationThreshold(5000L);
+    setClusterConfig(clusterConfig);
+    runTestWithNoInjection(cfg, true);
   }
 
   // Test success with no available clue about previous master.
   // For example, controller is just changed to a new node.
-  @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
-  public void testHandoffDurationWithDefaultStartTime(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+  @Test(
+      dataProvider = "successCurrentStateInput",
+      dependsOnMethods = "testHandoffDurationWithPendingMessage"
+  )
+  public void testHandoffDurationWithDefaultStartTime(final TestCaseConfig cfg) {
     preSetup();
 
-    // reset expectedDuration since current system time would be used
-    for (Map<String, String> states : handOffCurrentStates.values()) {
-      if (states.get("CurrentState").equals("MASTER")) {
-        expectedDuration = Long.parseLong(states.get("EndTime")) - System.currentTimeMillis();
+    // No initialCurrentStates means no input can be used as the clue of the previous master.
+    // in such case, reportTopStateMissing will use current system time as missing top state
+    // start time, and we assume it is a graceful handoff, and only "to top state" user latency
+    // will be recorded
+    long userLatency = 0;
+    for (CurrentStateInfo info : cfg.currentStateWithMissingTopState.values()) {
+      if (info.previousState.equals("MASTER")) {
+        userLatency = cfg.userLatency - (info.endTime - info.startTime);
+      }
+      info.previousState = "OFFLINE";
+    }
+    for (CurrentStateInfo states : cfg.finalCurrentState.values()) {
+      if (states.currentState.equals("MASTER")) {
+        states.endTime = System.currentTimeMillis();
+        states.startTime = System.currentTimeMillis() - userLatency;
         break;
       }
     }
-
-    // No initialCurrentStates means no input can be used as the clue of the previous master.
-    runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates, null, 1, 0,
-        Range.atMost(1500L), Range.atMost(1500L));
+    runStageAndVerify(Collections.EMPTY_MAP, cfg.currentStateWithMissingTopState,
+        cfg.finalCurrentState, null, 1, 0,
+        Range.closed(userLatency, userLatency),
+        DURATION_ZERO,
+        Range.closed(userLatency, userLatency),
+        Range.closed(userLatency, userLatency)
+    );
   }
 
   /**
    * Test success with only a pending message as the clue.
    * For instance, if the master was dropped, there is no way to track the dropping time.
    * So either use current system time.
-   *
    * @see org.apache.helix.monitoring.mbeans.TestTopStateHandoffMetrics#testHandoffDurationWithDefaultStartTime
    * Or we can check if any pending message to be used as the start time.
    */
   @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
-  public void testHandoffDurationWithPendingMessage(
-      final Map<String, Map<String, String>> initialCurrentStates,
-      final Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+  public void testHandoffDurationWithPendingMessage(final TestCaseConfig cfg) {
     final long messageTimeBeforeMasterless = 145;
     preSetup();
 
-    long durationToVerify = expectedDuration + messageTimeBeforeMasterless;
+    long durationToVerify = cfg.duration + messageTimeBeforeMasterless;
+    long userLatency = 0;
+    for (CurrentStateInfo info : cfg.finalCurrentState.values()) {
+      if (info.currentState.equals("MASTER")) {
+        userLatency = info.endTime - info.startTime;
+      }
+    }
+
     // No initialCurrentStates means no input can be used as the clue of the previous master.
-    runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates,
+    // in this case, we will treat the handoff as graceful and only to-master user latency
+    // will be recorded
+    runStageAndVerify(
+        Collections.EMPTY_MAP, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
         new MissingStatesDataCacheInject() {
           @Override public void doInject(ClusterDataCache cache) {
             String topStateNode = null;
-            for (String instance : initialCurrentStates.keySet()) {
-              if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
+            for (String instance : cfg.initialCurrentStates.keySet()) {
+              if (cfg.initialCurrentStates.get(instance).currentState.equals("MASTER")) {
                 topStateNode = instance;
                 break;
               }
             }
             // Simulate the previous top state instance goes offline
             if (topStateNode != null) {
-              long originalStartTime =
-                  Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
+              long originalStartTime = cfg.currentStateWithMissingTopState.get(topStateNode).startTime;
               // Inject a message that fit expectedDuration
               Message message =
                   new Message(Message.MessageType.STATE_TRANSITION, "thisisafakemessage");
@@ -232,112 +307,112 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
               cache.cacheMessages(Collections.singletonList(message));
             }
           }
-        }, 1, 0, Range.closed(durationToVerify, durationToVerify),
-        Range.closed(durationToVerify, durationToVerify));
+        }, 1, 0,
+        Range.closed(durationToVerify, durationToVerify),
+        DURATION_ZERO,
+        Range.closed(durationToVerify, durationToVerify),
+        Range.closed(userLatency, userLatency));
   }
 
-  private final String CURRENT_STATE = "CurrentState";
-  private final String PREVIOUS_STATE = "PreviousState";
-  private final String START_TIME = "StartTime";
-  private final String END_TIME = "EndTime";
-
   @DataProvider(name = "successCurrentStateInput")
   public Object[][] successCurrentState() {
-    return loadInputData("succeeded");
+    return testCaseConfigListToObjectArray(config.succeeded);
   }
 
   @DataProvider(name = "failedCurrentStateInput")
   public Object[][] failedCurrentState() {
-    return loadInputData("failed");
+    return testCaseConfigListToObjectArray(config.failed);
   }
 
   @DataProvider(name = "fastCurrentStateInput")
   public Object[][] fastCurrentState() {
-    return loadInputData("fast");
+    return testCaseConfigListToObjectArray(config.fast);
   }
 
-  private Object[][] loadInputData(String inputEntry) {
-    Object[][] inputData = null;
-    InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE);
+  @DataProvider(name = "succeededNonGraceful")
+  public Object[][] nonGracefulCurrentState() {
+    return testCaseConfigListToObjectArray(config.succeededNonGraceful);
+  }
 
-    try {
-      ObjectReader mapReader = new ObjectMapper().reader(Map.class);
-      Map<String, Object> inputMaps = mapReader.readValue(inputStream);
-
-      List<Map<String, Object>> inputs = (List<Map<String, Object>>) inputMaps.get(inputEntry);
-      inputData = new Object[inputs.size()][];
-      for (int i = 0; i < inputData.length; i++) {
-        Map<String, Map<String, String>> intialCurrentStates =
-            (Map<String, Map<String, String>>) inputs.get(i).get(INITIAL_CURRENT_STATES);
-        Map<String, Map<String, String>> missingTopStates =
-            (Map<String, Map<String, String>>) inputs.get(i).get(MISSING_TOP_STATES);
-        Map<String, Map<String, String>> handoffCurrentStates =
-            (Map<String, Map<String, String>>) inputs.get(i).get(HANDOFF_CURRENT_STATES);
-        Long expectedDuration = Long.parseLong((String) inputs.get(i).get(EXPECTED_DURATION));
-
-        inputData[i] = new Object[] { intialCurrentStates, missingTopStates, handoffCurrentStates,
-            expectedDuration
-        };
-      }
-    } catch (IOException e) {
-      e.printStackTrace();
+  private Object[][] testCaseConfigListToObjectArray(List<TestCaseConfig> configs) {
+    Object[][] result = new Object[configs.size()][];
+    for (int i = 0; i < configs.size(); i++) {
+      result[i] = new Object[] {configs.get(i)};
     }
+    return result;
+  }
 
-    return inputData;
+  private void runTestWithNoInjection(TestCaseConfig cfg, boolean expectFail) {
+    preSetup();
+    Range<Long> duration = Range.closed(cfg.duration, cfg.duration);
+    Range<Long> expectedDuration = cfg.isGraceful ? duration : DURATION_ZERO;
+    Range<Long> expectedNonGracefulDuration = cfg.isGraceful ? DURATION_ZERO : duration;
+    Range<Long> expectedUserLatency =
+        cfg.isGraceful ? Range.closed(cfg.userLatency, cfg.userLatency) : DURATION_ZERO;
+    runStageAndVerify(cfg.initialCurrentStates, cfg.currentStateWithMissingTopState,
+        cfg.finalCurrentState, null, expectFail ? 0 : 1, expectFail ? 1 : 0, expectedDuration, expectedNonGracefulDuration,
+        expectedDuration, expectedUserLatency);
   }
 
   private Map<String, CurrentState> generateCurrentStateMap(
-      Map<String, Map<String, String>> currentStateRawData) {
+      Map<String, CurrentStateInfo> currentStateRawData) {
     Map<String, CurrentState> currentStateMap = new HashMap<String, CurrentState>();
     for (String instanceName : currentStateRawData.keySet()) {
-      Map<String, String> propertyMap = currentStateRawData.get(instanceName);
+      CurrentStateInfo info = currentStateRawData.get(instanceName);
       CurrentState currentState = new CurrentState(TEST_RESOURCE);
       currentState.setSessionId(SESSION_PREFIX + instanceName.split("_")[1]);
-      currentState.setState(PARTITION, propertyMap.get(CURRENT_STATE));
-      currentState.setPreviousState(PARTITION, propertyMap.get(PREVIOUS_STATE));
-      currentState.setStartTime(PARTITION, Long.parseLong(propertyMap.get(START_TIME)));
-      currentState.setEndTime(PARTITION, Long.parseLong(propertyMap.get(END_TIME)));
+      currentState.setState(PARTITION, info.currentState);
+      currentState.setPreviousState(PARTITION, info.previousState);
+      currentState.setStartTime(PARTITION, info.startTime);
+      currentState.setEndTime(PARTITION, info.endTime);
       currentStateMap.put(instanceName, currentState);
     }
     return currentStateMap;
   }
 
-  private void runCurrentStage(Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates,
+  private void runPipeLine(Map<String, CurrentStateInfo> initialCurrentStates,
+      Map<String, CurrentStateInfo> missingTopStates,
+      Map<String, CurrentStateInfo> handOffCurrentStates,
       MissingStatesDataCacheInject testInjection) {
 
     if (initialCurrentStates != null && !initialCurrentStates.isEmpty()) {
-      setupCurrentStates(generateCurrentStateMap(initialCurrentStates));
-      runStage(event, new ReadClusterDataStage());
-      runStage(event, new CurrentStateComputationStage());
+      doRunStages(initialCurrentStates, null);
     }
 
     if (missingTopStates != null && !missingTopStates.isEmpty()) {
-      setupCurrentStates(generateCurrentStateMap(missingTopStates));
-      runStage(event, new ReadClusterDataStage());
-      if (testInjection != null) {
-        ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-        testInjection.doInject(cache);
-      }
-      runStage(event, new CurrentStateComputationStage());
+      doRunStages(missingTopStates, testInjection);
     }
 
     if (handOffCurrentStates != null && !handOffCurrentStates.isEmpty()) {
-      setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
-      runStage(event, new ReadClusterDataStage());
-      runStage(event, new CurrentStateComputationStage());
+      doRunStages(handOffCurrentStates, null);
+    }
+  }
+
+  private void doRunStages(Map<String, CurrentStateInfo> currentStates,
+      MissingStatesDataCacheInject clusterDataInjection) {
+    setupCurrentStates(generateCurrentStateMap(currentStates));
+    runStage(event, new ReadClusterDataStage());
+    if (clusterDataInjection != null) {
+      ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+      clusterDataInjection.doInject(cache);
     }
+    runStage(event, new CurrentStateComputationStage());
+    runStage(event, new TopStateHandoffReportStage());
   }
 
   private void runStageAndVerify(
-      Map<String, Map<String, String>> initialCurrentStates,
-      Map<String, Map<String, String>> missingTopStates,
-      Map<String, Map<String, String>> handOffCurrentStates, MissingStatesDataCacheInject inject,
-      int successCnt, int failCnt,
-      Range<Long> expectedDuration, Range<Long> expectedMaxDuration
+      Map<String, CurrentStateInfo> initialCurrentStates,
+      Map<String, CurrentStateInfo> missingTopStates,
+      Map<String, CurrentStateInfo> handOffCurrentStates,
+      MissingStatesDataCacheInject inject,
+      int successCnt,
+      int failCnt,
+      Range<Long> expectedDuration,
+      Range<Long> expectedNonGracefulDuration,
+      Range<Long> expectedMaxDuration,
+      Range<Long> expectedUserLatency
   ) {
-    runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, inject);
+    runPipeLine(initialCurrentStates, missingTopStates, handOffCurrentStates, inject);
     ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
     ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
@@ -345,10 +420,17 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
     Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), successCnt);
     Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), failCnt);
 
-    Assert.assertTrue(
-        expectedDuration.contains(monitor.getSuccessfulTopStateHandoffDurationCounter()));
-    Assert.assertTrue(
-        expectedMaxDuration.contains(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge()));
+    long graceful = monitor.getPartitionTopStateHandoffDurationGauge()
+        .getAttributeValue(GRACEFUL_HANDOFF_DURATION).longValue();
+    long nonGraceful = monitor.getPartitionTopStateNonGracefulHandoffDurationGauge()
+        .getAttributeValue(NON_GRACEFUL_HANDOFF_DURATION).longValue();
+    long user = monitor.getPartitionTopStateHandoffUserLatencyGauge()
+        .getAttributeValue(HANDOFF_USER_LATENCY).longValue();
+    long max = monitor.getMaxSinglePartitionTopStateHandoffDurationGauge();
+    Assert.assertTrue(expectedDuration.contains(graceful));
+    Assert.assertTrue(expectedNonGracefulDuration.contains(nonGraceful));
+    Assert.assertTrue(expectedUserLatency.contains(user));
+    Assert.assertTrue(expectedMaxDuration.contains(max));
   }
 
   interface MissingStatesDataCacheInject {

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
index d296e6c..0b39a1d 100644
--- a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
+++ b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
@@ -1,306 +1,371 @@
 {
   "succeeded": [
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "10000",
-          "EndTime": "11000"
+          "StartTime": 10000,
+          "EndTime": 11000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "7000"
+      "Duration": 7000,
+      "UserLatency": 5000,
+      "IsGraceful": true
     },
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "2500",
-          "EndTime": "5000"
+          "StartTime": 2500,
+          "EndTime": 5000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "ERROR",
           "PreviousState": "SLAVE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "14000"
+      "Duration": 14000,
+      "UserLatency": 4000,
+      "IsGraceful": true
+    }
+  ],
+  "succeededNonGraceful": [
+    {
+      "InitialCurrentStates": {
+        "localhost_0": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": 10000,
+          "EndTime": 11000
+        },
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        }
+      },
+      "MissingTopStates": {
+        "localhost_1": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        }
+      },
+      "HandoffCurrentStates": {
+        "localhost_1": {
+          "CurrentState": "MASTER",
+          "PreviousState": "SLAVE",
+          "StartTime": 20000,
+          "EndTime": 22000
+        },
+        "localhost_2": {
+          "CurrentState": "SLAVE",
+          "PreviousState": "OFFLINE",
+          "StartTime": 8000,
+          "EndTime": 10000
+        }
+      },
+      "Duration": 7000,
+      "UserLatency": 5000,
+      "IsGraceful": false
     }
   ],
   "failed": [
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "10000",
-          "EndTime": "11000"
+          "StartTime": 10000,
+          "EndTime": 11000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "0"
+      "Duration": 0,
+      "UserLatency": 0,
+      "IsGraceful": false
     },
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "10000",
-          "EndTime": "11000"
+          "StartTime": 10000,
+          "EndTime": 11000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "15000",
-          "EndTime": "18000"
+          "StartTime": 15000,
+          "EndTime": 18000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "20000",
-          "EndTime": "22000"
+          "StartTime": 20000,
+          "EndTime": 22000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "8000",
-          "EndTime": "10000"
+          "StartTime": 8000,
+          "EndTime": 10000
         }
       },
-      "expectedDuration": "0"
+      "Duration": 0,
+      "UserLatency": 0,
+      "IsGraceful": false
     }
   ],
   "fast": [
     {
-      "initialCurrentStates": {
+      "InitialCurrentStates": {
         "localhost_0": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         },
         "localhost_1": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "2500",
-          "EndTime": "5000"
+          "StartTime": 2500,
+          "EndTime": 5000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         }
       },
       "MissingTopStates": {
         "localhost_0": {
           "CurrentState": "MASTER",
           "PreviousState": "SLAVE",
-          "StartTime": "8000",
-          "EndTime": "9000"
+          "StartTime": 8000,
+          "EndTime": 9000
         },
         "localhost_1": {
           "CurrentState": "SLAVE",
           "PreviousState": "MASTER",
-          "StartTime": "6000",
-          "EndTime": "7000"
+          "StartTime": 6000,
+          "EndTime": 7000
         },
         "localhost_2": {
           "CurrentState": "SLAVE",
           "PreviousState": "OFFLINE",
-          "StartTime": "1000",
-          "EndTime": "2000"
+          "StartTime": 1000,
+          "EndTime": 2000
         }
       },
-      "handoffCurrentStates": {
+      "HandoffCurrentStates": {
 
       },
-      "expectedDuration": "3000"
+      "Duration": 3000,
+      "UserLatency": 2000,
+      "IsGraceful": true
     }
   ]
 }


[2/2] helix git commit: [HELIX-771] More detailed top state handoff metrics

Posted by jx...@apache.org.
[HELIX-771] More detailed top state handoff metrics


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7e49f995
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7e49f995
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7e49f995

Branch: refs/heads/master
Commit: 7e49f995e29ea200fcc42ce6af148ed521979f5c
Parents: 9d7364d
Author: Harry Zhang <hr...@linkedin.com>
Authored: Tue Oct 30 15:55:20 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Tue Oct 30 15:58:45 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   3 +-
 .../controller/stages/ClusterDataCache.java     |   4 +-
 .../stages/CurrentStateComputationStage.java    | 399 ---------------
 .../stages/MissingTopStateRecord.java           |  58 +++
 .../stages/TaskGarbageCollectionStage.java      |   8 +
 .../stages/TopStateHandoffReportStage.java      | 506 +++++++++++++++++++
 .../monitoring/mbeans/ClusterStatusMonitor.java |  37 +-
 .../monitoring/mbeans/ResourceMonitor.java      |  77 ++-
 .../java/org/apache/helix/task/TaskDriver.java  |   2 +-
 .../task/TestIndependentTaskRebalancer.java     |  60 ++-
 .../mbeans/TestTopStateHandoffMetrics.java      | 418 +++++++++------
 .../resources/TestTopStateHandoffMetrics.json   | 263 ++++++----
 12 files changed, 1103 insertions(+), 732 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 800a331..dd409e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -269,6 +269,7 @@ public class GenericHelixController implements IdealStateChangeListener,
       dataPreprocess.addStage(new ResourceComputationStage());
       dataPreprocess.addStage(new ResourceValidationStage());
       dataPreprocess.addStage(new CurrentStateComputationStage());
+      dataPreprocess.addStage(new TopStateHandoffReportStage());
 
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline(pipelineName);
@@ -381,7 +382,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     _taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
 
     _forceRebalanceTimer = new Timer();
-    _lastPipelineEndTimestamp = CurrentStateComputationStage.NOT_RECORDED;
+    _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
 
     initializeAsyncFIFOWorkers();
     initPipelines(_eventThread, _cache, false);

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index e1a374e..6de6d51 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -85,7 +85,7 @@ public class ClusterDataCache extends AbstractDataCache {
   private Map<String, ResourceConfig> _resourceConfigCacheMap;
   private Map<String, ClusterConstraints> _constraintMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
-  private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
+  private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap = new HashMap<>();
   private Map<String, Map<String, String>> _lastTopStateLocationMap = new HashMap<>();
   private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
   private Map<String, ExternalView> _externalViewMap = new HashMap<>();
@@ -678,7 +678,7 @@ public class ClusterDataCache extends AbstractDataCache {
     return null;
   }
 
-  public Map<String, Map<String, Long>> getMissingTopStateMap() {
+  public Map<String, Map<String, MissingTopStateRecord>> getMissingTopStateMap() {
     return _missingTopStateMap;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 0713070..d4927ec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.controller.LogUtil;
@@ -28,7 +27,6 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.model.*;
 import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,16 +39,10 @@ import org.slf4j.LoggerFactory;
 public class CurrentStateComputationStage extends AbstractBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(CurrentStateComputationStage.class);
 
-  public static final long NOT_RECORDED = -1L;
-  public static final long TRANSITION_FAILED = -2L;
-  public static final String TASK_STATE_MODEL_NAME = "Task";
-
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-    final Long lastPipelineFinishTimestamp = event
-        .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), NOT_RECORDED);
     final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
     if (cache == null || resourceMap == null) {
@@ -75,14 +67,6 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           instanceSessionId);
       updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
     }
-
-    if (!cache.isTaskCache()) {
-      ClusterStatusMonitor clusterStatusMonitor =
-          event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      // TODO Update the status async -- jjwang
-      updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
-          lastPipelineFinishTimestamp);
-    }
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
   }
 
@@ -219,387 +203,4 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
       currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
     }
   }
-
-  private void updateTopStateStatus(ClusterDataCache cache,
-      ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
-      CurrentStateOutput currentStateOutput,
-      long lastPipelineFinishTimestamp) {
-    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-
-    long durationThreshold = Long.MAX_VALUE;
-    if (cache.getClusterConfig() != null) {
-      durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
-    }
-
-    // Remove any resource records that no longer exists
-    missingTopStateMap.keySet().retainAll(resourceMap.keySet());
-    lastTopStateMap.keySet().retainAll(resourceMap.keySet());
-
-    for (Resource resource : resourceMap.values()) {
-      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-      if (stateModelDef == null || resource.getStateModelDefRef()
-          .equalsIgnoreCase(TASK_STATE_MODEL_NAME)) {
-        // Resource does not have valid statemodel or it is task state model
-        continue;
-      }
-
-      String resourceName = resource.getResourceName();
-
-      for (Partition partition : resource.getPartitions()) {
-        String currentTopStateInstance =
-            findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
-        String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
-
-        if (currentTopStateInstance != null) {
-          reportTopStateExistance(cache, currentStateOutput, stateModelDef, resourceName, partition,
-              lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
-              durationThreshold, lastPipelineFinishTimestamp);
-          updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
-        } else {
-          reportTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
-              partition, stateModelDef.getTopState(), currentStateOutput);
-          reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
-              clusterStatusMonitor);
-        }
-      }
-    }
-
-    if (clusterStatusMonitor != null) {
-      clusterStatusMonitor.resetMaxMissingTopStateGauge();
-    }
-  }
-
-  /**
-   * From current state output, find out the location of the top state of given resource
-   * and partition
-   *
-   * @param currentStateOutput current state output
-   * @param resourceName resource name
-   * @param partition partition of the resource
-   * @param stateModelDef state model def object
-   * @return name of the node that contains top state, null if there is not top state recorded
-   */
-  private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
-      String resourceName, Partition partition, StateModelDefinition stateModelDef) {
-    Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
-    for (String instance : stateMap.keySet()) {
-      if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
-        return instance;
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Find cached top state location of the given resource and partition
-   *
-   * @param cache cluster data cache object
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @return cached name of the node that contains top state, null if not previously cached
-   */
-  private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
-    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-    return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
-        .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
-        .get(partition.getPartitionName()) : null;
-  }
-
-  /**
-   * Update top state location cache of the given resource and partition
-   *
-   * @param cache cluster data cache object
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param currentTopStateInstance name of the instance that currently has the top state
-   */
-  private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
-      Partition partition, String currentTopStateInstance) {
-    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-    if (!lastTopStateMap.containsKey(resourceName)) {
-      lastTopStateMap.put(resourceName, new HashMap<String, String>());
-    }
-    lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
-  }
-
-  /**
-   * When we observe a top state of a given resource and partition, we need to report for the
-   * following 2 scenarios:
-   *  1. This is a top state come back, i.e. we have a previously missing top state record
-   *  2. Top state location change, i.e. current top state location is different from what
-   *     we saw previously
-   *
-   * @param cache cluster data cache
-   * @param currentStateOutput generated after computiting current state
-   * @param stateModelDef State model definition object of the given resource
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param lastTopStateInstance our cached top state location
-   * @param currentTopStateInstance top state location we observed during this pipeline run
-   * @param clusterStatusMonitor monitor object
-   * @param durationThreshold top state handoff duration threshold
-   * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
-   */
-  private void reportTopStateExistance(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
-      StateModelDefinition stateModelDef, String resourceName, Partition partition,
-      String lastTopStateInstance, String currentTopStateInstance,
-      ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
-      long lastPipelineFinishTimestamp) {
-
-    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-
-    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
-        .containsKey(partition.getPartitionName())) {
-      // We previously recorded a top state missing, and it's not coming back
-      reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
-          missingTopStateMap, resourceName, partition, clusterStatusMonitor, durationThreshold,
-          stateModelDef.getTopState());
-    } else if (lastTopStateInstance != null && !lastTopStateInstance
-        .equals(currentTopStateInstance)) {
-      // With no missing top state record, but top state instance changed,
-      // we observed an entire top state handoff process
-      reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
-          resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
-    } else {
-      // else, there is not top state change, or top state first came up, do nothing
-      LogUtil.logDebug(LOG, _eventId, String.format(
-          "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
-          partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
-    }
-  }
-
-  /**
-   * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
-   * i.e. current top state instance loaded from ZK is different than the one we cached during
-   * last pipeline run.
-   *
-   * @param cache ClusterDataCache
-   * @param lastTopStateInstance Name of last top state instance we cached
-   * @param curTopStateInstance Name of current top state instance we refreshed from ZK
-   * @param resourceName resource name
-   * @param partition partition object
-   * @param clusterStatusMonitor cluster state monitor object
-   * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
-   */
-  private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
-      String curTopStateInstance, String resourceName, Partition partition,
-      ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
-    if (curTopStateInstance.equals(lastTopStateInstance)) {
-      return;
-    }
-
-    // Current state output generation logic guarantees that current top state instance
-    // must be a live instance
-    String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
-    long endTime =
-        cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
-            .getEndTime(partition.getPartitionName());
-
-    long startTime = NOT_RECORDED;
-    if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
-      String lastTopStateSession =
-          cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
-      // Make sure last top state instance has not bounced during cluster data cache refresh
-      // We need this null check as there are test cases creating incomplete current state
-      if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
-          != null) {
-        startTime =
-            cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
-                .getStartTime(partition.getPartitionName());
-      }
-    }
-    if (startTime == NOT_RECORDED) {
-      // either cached last top state instance is no longer alive, or it bounced during cluster
-      // data cache refresh, we use last pipeline run end time for best guess. Though we can
-      // calculate this number in a more precise way by refreshing data from ZK, given the rarity
-      // of this corner case, it's not worthy.
-      startTime = lastPipelineFinishTimestamp;
-    }
-
-    if (startTime == NOT_RECORDED || startTime > endTime) {
-      // Top state handoff finished before end of last pipeline run, and instance contains
-      // previous top state is no longer alive, so our best guess did not work, ignore the
-      // data point for now.
-      LogUtil.logWarn(LOG, _eventId, String
-          .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
-              partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
-      return;
-    }
-
-    LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
-        endTime - startTime, partition.getPartitionName()));
-    if (clusterStatusMonitor != null) {
-      clusterStatusMonitor
-          .updateMissingTopStateDurationStats(resourceName, endTime - startTime,
-              true);
-    }
-  }
-
-  /**
-   * Check if the given partition of the given resource has a missing top state duration larger
-   * than the threshold, if so, report a top state transition failure
-   *
-   * @param cache cluster data cache
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param durationThreshold top state handoff duration threshold
-   * @param clusterStatusMonitor monitor object
-   */
-  private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
-      Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
-    Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-    String partitionName = partition.getPartitionName();
-    Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
-    if (startTime != null && startTime > 0
-        && System.currentTimeMillis() - startTime > durationThreshold) {
-      missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
-      if (clusterStatusMonitor != null) {
-        clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
-      }
-    }
-  }
-
-  /**
-   * When we find a top state missing of the given partition, we find out when it started to miss
-   * top state, then we record it in cache
-   *
-   * @param cache cluster data cache
-   * @param missingTopStateMap missing top state record
-   * @param lastTopStateMap our cached last top state locations
-   * @param resourceName resource name
-   * @param partition partition of the given resource
-   * @param topState top state name
-   * @param currentStateOutput current state output
-   */
-  private void reportTopStateMissing(ClusterDataCache cache,
-      Map<String, Map<String, Long>> missingTopStateMap,
-      Map<String, Map<String, String>> lastTopStateMap, String resourceName, Partition partition,
-      String topState, CurrentStateOutput currentStateOutput) {
-    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
-        .containsKey(partition.getPartitionName())) {
-      // a previous missing has been already recorded
-      return;
-    }
-
-    long startTime = NOT_RECORDED;
-
-    // 1. try to find the previous topstate missing event for the startTime.
-    String missingStateInstance = null;
-    if (lastTopStateMap.containsKey(resourceName)) {
-      missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
-    }
-
-    if (missingStateInstance != null) {
-      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-      if (liveInstances.containsKey(missingStateInstance)) {
-        CurrentState currentState = cache.getCurrentState(missingStateInstance,
-            liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
-
-        if (currentState != null
-            && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
-            .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
-          // Update the latest start time only from top state to other state transition
-          // At beginning, the start time should -1 (not recorded). If something happen either
-          // instance not alive or the instance just started for that partition, Helix does not know
-          // the previous start time or end time. So we count from current.
-          //
-          // Previous state is top state does not mean that resource has only one top state
-          // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
-          startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
-        } // Else no related state transition history found, use current time as the missing start time.
-      } else {
-        // If the previous topState holder is no longer alive, the offline time is used as start time.
-        Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
-        if (offlineMap.containsKey(missingStateInstance)) {
-          startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
-        }
-      }
-    }
-
-    // 2. if no previous topstate records, use any pending message that are created for topstate transition
-    if (startTime == NOT_RECORDED) {
-      for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
-          .values()) {
-        // Only messages that match the current session ID will be recorded in the map.
-        // So no need to redundantly check here.
-        if (message.getToState().equals(topState)) {
-          startTime = Math.max(startTime, message.getCreateTimeStamp());
-        }
-      }
-    }
-
-    // 3. if no clue about previous topstate or any related pending message, use the current system time.
-    if (startTime == NOT_RECORDED) {
-      LogUtil.logWarn(LOG, _eventId,
-          "Cannot confirm top state missing start time. Use the current system time as the start time.");
-      startTime = System.currentTimeMillis();
-    }
-
-    if (!missingTopStateMap.containsKey(resourceName)) {
-      missingTopStateMap.put(resourceName, new HashMap<String, Long>());
-    }
-
-    Map<String, Long> partitionMap = missingTopStateMap.get(resourceName);
-    // Update the new partition without top state
-    if (!partitionMap.containsKey(partition.getPartitionName())) {
-      partitionMap.put(partition.getPartitionName(), startTime);
-    }
-  }
-
-  /**
-   * When we see a top state come back, i.e. we observe a top state in this pipeline run,
-   * but have a top state missing record before, we need to remove the top state missing
-   * record and report top state handoff duration
-   *
-   * @param cache cluster data cache
-   * @param stateMap state map of the given partition of the given resource
-   * @param missingTopStateMap missing top state record
-   * @param resourceName resource name
-   * @param partition partition of the resource
-   * @param clusterStatusMonitor monitor object
-   * @param threshold top state handoff threshold
-   * @param topState name of the top state
-   */
-  private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap,
-      Map<String, Map<String, Long>> missingTopStateMap, String resourceName, Partition partition,
-      ClusterStatusMonitor clusterStatusMonitor, long threshold, String topState) {
-
-    long handOffStartTime = missingTopStateMap.get(resourceName).get(partition.getPartitionName());
-
-    // Find the earliest end time from the top states
-    long handOffEndTime = System.currentTimeMillis();
-    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
-    for (String instanceName : stateMap.keySet()) {
-      CurrentState currentState =
-          cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
-              .get(resourceName);
-      if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
-        handOffEndTime =
-            Math.min(handOffEndTime, currentState.getEndTime(partition.getPartitionName()));
-      }
-    }
-
-    if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
-      LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
-          handOffEndTime - handOffStartTime, partition.getPartitionName()));
-      if (clusterStatusMonitor != null) {
-        clusterStatusMonitor
-            .updateMissingTopStateDurationStats(resourceName, handOffEndTime - handOffStartTime,
-                true);
-      }
-    }
-    removeFromStatsMap(missingTopStateMap, resourceName, partition);
-  }
-
-  private void removeFromStatsMap(Map<String, Map<String, Long>> missingTopStateMap,
-      String resourceName, Partition partition) {
-    if (missingTopStateMap.containsKey(resourceName)) {
-      missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
-    }
-
-    if (missingTopStateMap.get(resourceName).size() == 0) {
-      missingTopStateMap.remove(resourceName);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
new file mode 100644
index 0000000..3aaf326
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
@@ -0,0 +1,58 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A record entry in cluster data cache containing information about a partition's
+ * missing top state
+ */
+public class MissingTopStateRecord {
+  private final boolean isGracefulHandoff;
+  private final long startTimeStamp;
+  private final long userLatency;
+  private boolean failed;
+
+  public MissingTopStateRecord(long start, long user, boolean graceful) {
+    isGracefulHandoff = graceful;
+    startTimeStamp = start;
+    userLatency = user;
+    failed = false;
+  }
+
+  /* package */ boolean isGracefulHandoff() {
+    return isGracefulHandoff;
+  }
+
+  /* package */ long getStartTimeStamp() {
+    return startTimeStamp;
+  }
+
+  /* package */ long getUserLatency() {
+    return userLatency;
+  }
+
+  /* package */ void setFailed() {
+    failed = true;
+  }
+
+  /* package */ boolean isFailed() {
+    return failed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 247d053..4f417fd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -24,6 +24,14 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
   public void execute(ClusterEvent event) {
     ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
     HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+
+    if (clusterDataCache == null || manager == null) {
+      LOG.warn(
+          "ClusterDataCache or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+          event.getEventId(), event.getEventType(), event.getClusterName());
+      return;
+    }
+
     Set<WorkflowConfig> existingWorkflows =
         new HashSet<>(clusterDataCache.getWorkflowConfigMap().values());
     for (WorkflowConfig workflowConfig : existingWorkflows) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
new file mode 100644
index 0000000..699f2a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
@@ -0,0 +1,506 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Observe top state handoff and report latency
+ * TODO: make this stage async
+ */
+public class TopStateHandoffReportStage extends AbstractBaseStage {
+  private static final long DEFAULT_HANDOFF_USER_LATENCY = 0L;
+  private static Logger LOG = LoggerFactory.getLogger(TopStateHandoffReportStage.class);
+  public static final long TIMESTAMP_NOT_RECORDED = -1L;
+
+  @Override
+  public void process(ClusterEvent event) throws Exception {
+    _eventId = event.getEventId();
+    final ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+    final Long lastPipelineFinishTimestamp = event
+        .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
+            TIMESTAMP_NOT_RECORDED);
+    final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+    final CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+    final ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
+
+    if (cache == null || resourceMap == null || currentStateOutput == null) {
+      throw new StageException(
+          "Missing critical attributes for stage, requires ClusterDataCache, RESOURCES and CURRENT_STATE");
+    }
+
+    if (cache.isTaskCache()) {
+      throw new StageException("TopStateHandoffReportStage can only be used in resource pipeline");
+    }
+
+    updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
+        lastPipelineFinishTimestamp);
+
+  }
+
+  private void updateTopStateStatus(ClusterDataCache cache,
+      ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
+      CurrentStateOutput currentStateOutput,
+      long lastPipelineFinishTimestamp) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+
+    long durationThreshold = Long.MAX_VALUE;
+    if (cache.getClusterConfig() != null) {
+      durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
+    }
+
+    // Remove any resource records that no longer exists
+    missingTopStateMap.keySet().retainAll(resourceMap.keySet());
+    lastTopStateMap.keySet().retainAll(resourceMap.keySet());
+
+    for (Resource resource : resourceMap.values()) {
+      StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+      if (stateModelDef == null) {
+        // Resource does not have valid state model, just skip processing
+        continue;
+      }
+
+      String resourceName = resource.getResourceName();
+
+      for (Partition partition : resource.getPartitions()) {
+        String currentTopStateInstance =
+            findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
+        String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
+
+        if (currentTopStateInstance != null) {
+          reportTopStateExistence(cache, currentStateOutput, stateModelDef, resourceName, partition,
+              lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
+              durationThreshold, lastPipelineFinishTimestamp);
+          updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
+        } else {
+          reportTopStateMissing(cache, resourceName,
+              partition, stateModelDef.getTopState(), currentStateOutput);
+          reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
+              clusterStatusMonitor);
+        }
+      }
+    }
+
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.resetMaxMissingTopStateGauge();
+    }
+  }
+
+  /**
+   * From current state output, find out the location of the top state of given resource
+   * and partition
+   *
+   * @param currentStateOutput current state output
+   * @param resourceName resource name
+   * @param partition partition of the resource
+   * @param stateModelDef state model def object
+   * @return name of the node that contains top state, null if there is not top state recorded
+   */
+  private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
+      String resourceName, Partition partition, StateModelDefinition stateModelDef) {
+    Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
+    for (String instance : stateMap.keySet()) {
+      if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
+        return instance;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Find cached top state location of the given resource and partition
+   *
+   * @param cache cluster data cache object
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @return cached name of the node that contains top state, null if not previously cached
+   */
+  private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
+        .get(partition.getPartitionName()) : null;
+  }
+
+  /**
+   * Update top state location cache of the given resource and partition
+   *
+   * @param cache cluster data cache object
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param currentTopStateInstance name of the instance that currently has the top state
+   */
+  private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
+      Partition partition, String currentTopStateInstance) {
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    if (!lastTopStateMap.containsKey(resourceName)) {
+      lastTopStateMap.put(resourceName, new HashMap<String, String>());
+    }
+    lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
+  }
+
+  /**
+   * When we observe a top state of a given resource and partition, we need to report for the
+   * following 2 scenarios:
+   *  1. This is a top state come back, i.e. we have a previously missing top state record
+   *  2. Top state location change, i.e. current top state location is different from what
+   *     we saw previously
+   *
+   * @param cache cluster data cache
+   * @param currentStateOutput generated after computing current state
+   * @param stateModelDef State model definition object of the given resource
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param lastTopStateInstance our cached top state location
+   * @param currentTopStateInstance top state location we observed during this pipeline run
+   * @param clusterStatusMonitor monitor object
+   * @param durationThreshold top state handoff duration threshold
+   * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
+   */
+  private void reportTopStateExistence(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
+      StateModelDefinition stateModelDef, String resourceName, Partition partition,
+      String lastTopStateInstance, String currentTopStateInstance,
+      ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
+      long lastPipelineFinishTimestamp) {
+
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+
+    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName())) {
+      // We previously recorded a top state missing, and it's not coming back
+      reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
+          resourceName, partition, clusterStatusMonitor, durationThreshold,
+          stateModelDef.getTopState());
+    } else if (lastTopStateInstance != null && !lastTopStateInstance
+        .equals(currentTopStateInstance)) {
+      // With no missing top state record, but top state instance changed,
+      // we observed an entire top state handoff process
+      reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
+          resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
+    } else {
+      // else, there is not top state change, or top state first came up, do nothing
+      LogUtil.logDebug(LOG, _eventId, String.format(
+          "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
+          partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
+    }
+  }
+
+  /**
+   * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
+   * i.e. current top state instance loaded from ZK is different than the one we cached during
+   * last pipeline run.
+   *
+   * @param cache ClusterDataCache
+   * @param lastTopStateInstance Name of last top state instance we cached
+   * @param curTopStateInstance Name of current top state instance we refreshed from ZK
+   * @param resourceName resource name
+   * @param partition partition object
+   * @param clusterStatusMonitor cluster state monitor object
+   * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
+   */
+  private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
+      String curTopStateInstance, String resourceName, Partition partition,
+      ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
+    if (curTopStateInstance.equals(lastTopStateInstance)) {
+      return;
+    }
+
+    // Current state output generation logic guarantees that current top state instance
+    // must be a live instance
+    String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
+    long endTime =
+        cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+            .getEndTime(partition.getPartitionName());
+    long toTopStateuserLatency =
+        endTime - cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+            .getStartTime(partition.getPartitionName());
+
+    long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+    long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+
+    // Make sure last top state instance has not bounced during cluster data cache refresh
+    if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
+      String lastTopStateSession =
+          cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
+      // We need this null check as there are test cases creating incomplete current state
+      if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+          != null) {
+        startTime =
+            cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+                .getStartTime(partition.getPartitionName());
+        fromTopStateUserLatency =
+            cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+                .getEndTime(partition.getPartitionName()) - startTime;
+      }
+    }
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+      // either cached last top state instance is no longer alive, or it bounced during cluster
+      // data cache refresh, we use last pipeline run end time for best guess. Though we can
+      // calculate this number in a more precise way by refreshing data from ZK, given the rarity
+      // of this corner case, it's not worthy.
+      startTime = lastPipelineFinishTimestamp;
+      fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+    }
+
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED || startTime > endTime) {
+      // Top state handoff finished before end of last pipeline run, and instance contains
+      // previous top state is no longer alive, so our best guess did not work, ignore the
+      // data point for now.
+      LogUtil.logWarn(LOG, _eventId, String
+          .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
+              partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
+      return;
+    }
+
+    long duration = endTime - startTime;
+    long userLatency = fromTopStateUserLatency + toTopStateuserLatency;
+    // We always treat such top state handoff as graceful as if top state handoff is triggered
+    // by instance crash, we cannot observe the entire handoff process within 1 pipeline run
+    logMissingTopStateInfo(duration, userLatency, true, partition.getPartitionName());
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor
+          .updateMissingTopStateDurationStats(resourceName, duration, userLatency, true, true);
+    }
+  }
+
+  /**
+   * Check if the given partition of the given resource has a missing top state duration larger
+   * than the threshold, if so, report a top state transition failure
+   *
+   * @param cache cluster data cache
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param durationThreshold top state handoff duration threshold
+   * @param clusterStatusMonitor monitor object
+   */
+  private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
+      Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+    String partitionName = partition.getPartitionName();
+    MissingTopStateRecord record = missingTopStateMap.get(resourceName).get(partitionName);
+    long startTime = record.getStartTimeStamp();
+    if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold && !record
+        .isFailed()) {
+      record.setFailed();
+      missingTopStateMap.get(resourceName).put(partitionName, record);
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 0L, false, false);
+      }
+    }
+  }
+
+  /**
+   * When we find a top state missing of the given partition, we find out when it started to miss
+   * top state, then we record it in cache
+   *
+   * @param cache cluster data cache
+   * @param resourceName resource name
+   * @param partition partition of the given resource
+   * @param topState top state name
+   * @param currentStateOutput current state output
+   */
+  private void reportTopStateMissing(ClusterDataCache cache, String resourceName, Partition partition,
+      String topState, CurrentStateOutput currentStateOutput) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap = cache.getMissingTopStateMap();
+    Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+    if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+        .containsKey(partition.getPartitionName())) {
+      // a previous missing has been already recorded
+      return;
+    }
+
+    long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+    long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+    boolean isGraceful = true;
+
+    // 1. try to find the previous topstate missing event for the startTime.
+    String missingStateInstance = null;
+    if (lastTopStateMap.containsKey(resourceName)) {
+      missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
+    }
+
+    if (missingStateInstance != null) {
+      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+      if (liveInstances.containsKey(missingStateInstance)) {
+        CurrentState currentState = cache.getCurrentState(missingStateInstance,
+            liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
+
+        if (currentState != null
+            && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
+            .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+
+          // Update the latest start time only from top state to other state transition
+          // At beginning, the start time should -1 (not recorded). If something happen either
+          // instance not alive or the instance just started for that partition, Helix does not know
+          // the previous start time or end time. So we count from current.
+          //
+          // Previous state is top state does not mean that resource has only one top state
+          // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
+          long fromTopStateStartTime = currentState.getStartTime(partition.getPartitionName());
+          if (fromTopStateStartTime > startTime) {
+            startTime = fromTopStateStartTime;
+            fromTopStateUserLatency =
+                currentState.getEndTime(partition.getPartitionName()) - startTime;
+          }
+          startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
+        } // Else no related state transition history found, use current time as the missing start time.
+      } else {
+        // If the previous topState holder is no longer alive, the offline time is used as start time.
+        // Also, if we observe a top state missing and the previous top state node is gone, the
+        // top state handoff is not graceful
+        isGraceful = false;
+        Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
+        if (offlineMap.containsKey(missingStateInstance)) {
+          startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
+        }
+      }
+    }
+
+    // 2. if no previous top state records, it's either resource just created or there is a
+    // controller leadership change. Check any pending message that are created for top state
+    // transition. Assume this is graceful top state handoff as if the from top state instance
+    // crashed, we are not recording such message
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+      for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
+          .values()) {
+        // Only messages that match the current session ID will be recorded in the map.
+        // So no need to redundantly check here.
+        if (message.getToState().equals(topState)) {
+          startTime = Math.max(startTime, message.getCreateTimeStamp());
+        }
+      }
+    }
+
+    // 3. if no clue about previous top state or any related pending message, it could be
+    //    a. resource just created
+    //    b. controller leader switch (actual hand off could be either graceful or non graceful)
+    //
+    // Use the current system time as missing top state start time and assume always graceful
+    // TODO: revise this case and see if this case can be better addressed
+    if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+      LogUtil.logWarn(LOG, _eventId,
+          "Cannot confirm top state missing start time. Use the current system time as the start time.");
+      startTime = System.currentTimeMillis();
+    }
+
+    if (!missingTopStateMap.containsKey(resourceName)) {
+      missingTopStateMap.put(resourceName, new HashMap<String, MissingTopStateRecord>());
+    }
+
+    missingTopStateMap.get(resourceName).put(partition.getPartitionName(),
+        new MissingTopStateRecord(startTime, fromTopStateUserLatency, isGraceful));
+  }
+
+  /**
+   * When we see a top state come back, i.e. we observe a top state in this pipeline run,
+   * but have a top state missing record before, we need to remove the top state missing
+   * record and report top state handoff duration
+   *
+   * @param cache cluster data cache
+   * @param stateMap state map of the given partition of the given resource
+   * @param resourceName resource name
+   * @param partition partition of the resource
+   * @param clusterStatusMonitor monitor object
+   * @param threshold top state handoff threshold
+   * @param topState name of the top state
+   */
+  private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap, String resourceName,
+      Partition partition, ClusterStatusMonitor clusterStatusMonitor, long threshold,
+      String topState) {
+    Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+        cache.getMissingTopStateMap();
+    MissingTopStateRecord record =
+        missingTopStateMap.get(resourceName).get(partition.getPartitionName());
+    long handOffStartTime = record.getStartTimeStamp();
+    long fromTopStateUserLatency = record.getUserLatency();
+
+    // Find the earliest end time from the top states and the corresponding user latency
+    long handOffEndTime = Long.MAX_VALUE;
+    long toTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+    Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+    for (String instanceName : stateMap.keySet()) {
+      CurrentState currentState =
+          cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
+              .get(resourceName);
+      if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+        if (currentState.getEndTime(partition.getPartitionName()) <= handOffEndTime) {
+          handOffEndTime = currentState.getEndTime(partition.getPartitionName());
+          toTopStateUserLatency =
+              handOffEndTime - currentState.getStartTime(partition.getPartitionName());
+        }
+      }
+    }
+
+    if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
+      long duration = handOffEndTime - handOffStartTime;
+      long userLatency = fromTopStateUserLatency + toTopStateUserLatency;
+      // It is possible that during controller leader switch, we lost previous master information
+      // and use current time to approximate missing top state start time. If we see the actual
+      // user latency is larger than the duration we estimated, we use user latency to start with
+      duration = Math.max(duration, userLatency);
+      boolean isGraceful = record.isGracefulHandoff();
+      logMissingTopStateInfo(duration, userLatency, isGraceful, partition.getPartitionName());
+
+      if (clusterStatusMonitor != null) {
+        clusterStatusMonitor
+            .updateMissingTopStateDurationStats(resourceName, duration, userLatency, isGraceful,
+                true);
+      }
+    }
+    removeFromStatsMap(missingTopStateMap, resourceName, partition);
+  }
+
+  private void removeFromStatsMap(
+      Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap, String resourceName,
+      Partition partition) {
+    if (missingTopStateMap.containsKey(resourceName)) {
+      missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
+    }
+
+    if (missingTopStateMap.get(resourceName).size() == 0) {
+      missingTopStateMap.remove(resourceName);
+    }
+  }
+
+  private void logMissingTopStateInfo(long totalDuration, long userLatency, boolean isGraceful,
+      String partitionName) {
+    LogUtil.logInfo(LOG, _eventId, String
+        .format("Missing top state duration is %s/%s for partition %s. Graceful: %s", userLatency,
+            totalDuration, partitionName, isGraceful));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index e3655d8..f870ddc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -22,6 +22,21 @@ package org.apache.helix.monitoring.mbeans;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -38,22 +53,6 @@ import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusMonitor.class);
@@ -446,11 +445,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
     }
   }
 
-  public synchronized void updateMissingTopStateDurationStats(String resourceName, long duration, boolean succeeded) {
+  public synchronized void updateMissingTopStateDurationStats(String resourceName,
+      long totalDuration, long userLatency, boolean isGraceful, boolean succeeded) {
     ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
 
     if (resourceMonitor != null) {
-      resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, duration, succeeded);
+      resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, totalDuration,
+          userLatency, isGraceful, succeeded);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index a103a0c..c3dd242 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -21,7 +21,15 @@ package org.apache.helix.monitoring.mbeans;
 
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.ObjectName;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -31,10 +39,6 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
-import javax.management.JMException;
-import javax.management.ObjectName;
-import java.util.*;
-
 public class ResourceMonitor extends DynamicMBeanProvider {
 
   // Gauges
@@ -60,6 +64,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
 
   // Histograms
   private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+  private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge;
+  private HistogramDynamicMetric _partitionTopStateNonGracefulHandoffDurationGauge;
 
   private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
   private long _lastResetTime;
@@ -86,6 +92,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     attributeList.add(_failedTopStateHandoffCounter);
     attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
     attributeList.add(_partitionTopStateHandoffDurationGauge);
+    attributeList.add(_partitionTopStateHandoffUserLatencyGauge);
+    attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
     attributeList.add(_totalMessageReceived);
     attributeList.add(_numPendingStateTransitions);
     doRegister(attributeList, _initObjectName);
@@ -96,6 +104,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     TOP_STATE
   }
 
+  @SuppressWarnings("unchecked")
   public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
     _clusterName = clusterName;
     _resourceName = resourceName;
@@ -122,6 +131,14 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _partitionTopStateHandoffDurationGauge =
         new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
             new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
+    _partitionTopStateHandoffUserLatencyGauge =
+        new HistogramDynamicMetric("PartitionTopStateHandoffUserLatencyGauge", new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _partitionTopStateNonGracefulHandoffDurationGauge =
+        new HistogramDynamicMetric("PartitionTopStateNonGracefulHandoffGauge", new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
     _totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0L);
     _maxSinglePartitionTopStateHandoffDuration =
         new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L);
@@ -165,6 +182,18 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     return _maxSinglePartitionTopStateHandoffDuration.getValue();
   }
 
+  public HistogramDynamicMetric getPartitionTopStateHandoffDurationGauge() {
+    return _partitionTopStateHandoffDurationGauge;
+  }
+
+  public HistogramDynamicMetric getPartitionTopStateNonGracefulHandoffDurationGauge() {
+    return _partitionTopStateNonGracefulHandoffDurationGauge;
+  }
+
+  public HistogramDynamicMetric getPartitionTopStateHandoffUserLatencyGauge() {
+    return _partitionTopStateHandoffUserLatencyGauge;
+  }
+
   public long getFailedTopStateHandoffCounter() {
     return _failedTopStateHandoffCounter.getValue();
   }
@@ -310,25 +339,31 @@ public class ResourceMonitor extends DynamicMBeanProvider {
     _numPendingStateTransitions.updateValue((long) messageCount);
   }
 
-  public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) {
+  public void updateStateHandoffStats(MonitorState monitorState, long totalDuration,
+      long userLatency, boolean isGraceful, boolean succeeded) {
     switch (monitorState) {
-      case TOP_STATE:
-        if (succeeded) {
-          _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
-          _successfulTopStateHandoffDurationCounter
-              .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration);
-          _partitionTopStateHandoffDurationGauge.updateValue(duration);
-          if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
-            _maxSinglePartitionTopStateHandoffDuration.updateValue(duration);
-            _lastResetTime = System.currentTimeMillis();
-          }
+    case TOP_STATE:
+      if (succeeded) {
+        _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
+        _successfulTopStateHandoffDurationCounter
+            .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + totalDuration);
+        if (isGraceful) {
+          _partitionTopStateHandoffDurationGauge.updateValue(totalDuration);
+          _partitionTopStateHandoffUserLatencyGauge.updateValue(userLatency);
         } else {
-          _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+          _partitionTopStateNonGracefulHandoffDurationGauge.updateValue(totalDuration);
         }
-        break;
-      default:
-        _logger.warn(
-            String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
+        if (totalDuration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
+          _maxSinglePartitionTopStateHandoffDuration.updateValue(totalDuration);
+          _lastResetTime = System.currentTimeMillis();
+        }
+      } else {
+        _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+      }
+      break;
+    default:
+      _logger.warn(
+          String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
     }
   }
 
@@ -379,4 +414,4 @@ public class ResourceMonitor extends DynamicMBeanProvider {
       _lastResetTime = System.currentTimeMillis();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0225f83..ea529e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -861,7 +861,7 @@ public class TaskDriver {
     if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
       throw new HelixException(String.format(
           "Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
-          workflowName, targetStates.toString(),
+          workflowName, Arrays.asList(targetStates),
           ctx == null ? "null" : ctx.getWorkflowState().toString()));
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 7495078..5509858 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -173,42 +173,56 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
 
   @Test
   public void testReassignment() throws Exception {
-    final int NUM_INSTANCES = 5;
-    String jobName = TestHelper.getTestMethodName();
-    Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+    String workflowName = TestHelper.getTestMethodName();
+    String jobNameSuffix = "job";
+    String jobName = String.format("%s_%s", workflowName, jobNameSuffix);
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+
+    // This is error prone as ThreadCountBasedTaskAssigner will always be re-assign
+    // task to same instance given we only have 1 task to assign and the order or
+    // iterating all nodes during assignment is always the same. Rarely some change
+    // will alter the order of iteration debug assignment so we need to change
+    // this instance name to keep on testing this functionality.
+    final String failInstance = "localhost_12919";
     Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true,
-        "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
+        "failInstance", failInstance));
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
     jobCommandMap.put("Timeout", "1000");
 
-    JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
-        .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobCommandMap);
-    workflowBuilder.addJob(jobName, jobBuilder);
+    // Retry forever
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(jobCommandMap).setMaxAttemptsPerTask(Integer.MAX_VALUE);
+    workflowBuilder.addJob(jobNameSuffix, jobBuilder);
 
     _driver.start(workflowBuilder.build());
 
-    // Ensure the job completes
-    _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
-    _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
+    // Poll to ensure that the gets re-attempted first
+    int trial = 0;
+    while (trial < 1000) { // 100 sec
+      JobContext jctx = _driver.getJobContext(jobName);
+      if (jctx != null && jctx.getPartitionNumAttempts(0) > 1) {
+        break;
+      }
+      Thread.sleep(100);
+      trial += 1;
+    }
+
+    if (trial == 1000) {
+      Assert.fail("Job " + jobName + " is not retried");
+    }
+
+    // disable failed instance
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, false);
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
 
     // Ensure that the class was invoked
     Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
-
-    // Ensure that this was tried on two different instances, the first of which exhausted the
-    // attempts number, and the other passes on the first try -> See below
-
-    // TEST FIX: After quota-based scheduling support, we use a different assignment strategy (not
-    // consistent hashing), which does not necessarily guarantee that failed tasks will be assigned
-    // on a different instance. The parameters for this test are adjusted accordingly
-    // Also, hard-coding the instance name (line 184) is not a reliable way of testing whether
-    // re-assignment took place, so this test is no longer valid and will always pass
-    Assert.assertEquals(_runCounts.size(), 1);
-    // Assert.assertTrue(
-    // _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
-    Assert.assertTrue(_runCounts.values().contains(1));
+    Assert.assertNotSame(_driver.getJobContext(jobName).getAssignedParticipant(0), failInstance);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, true);
   }
 
   @Test