You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 23:08:12 UTC
[1/2] helix git commit: [HELIX-771] More detailed top state handoff
metrics
Repository: helix
Updated Branches:
refs/heads/master 9d7364d7a -> 7e49f995e
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index fac957c..c1cfce0 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -21,37 +21,118 @@ package org.apache.helix.monitoring.mbeans;
import com.google.common.collect.Range;
import java.io.IOException;
-import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import org.apache.helix.HelixConstants;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
+import org.apache.helix.controller.stages.TopStateHandoffReportStage;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Resource;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TestTopStateHandoffMetrics extends BaseStageTest {
public final static String TEST_INPUT_FILE = "TestTopStateHandoffMetrics.json";
- public final static String INITIAL_CURRENT_STATES = "initialCurrentStates";
- public final static String MISSING_TOP_STATES = "MissingTopStates";
- public final static String HANDOFF_CURRENT_STATES = "handoffCurrentStates";
- public final static String EXPECTED_DURATION = "expectedDuration";
public final static String TEST_RESOURCE = "TestResource";
public final static String PARTITION = "PARTITION";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String NON_GRACEFUL_HANDOFF_DURATION = "PartitionTopStateNonGracefulHandoffGauge.Max";
+ private static final String GRACEFUL_HANDOFF_DURATION = "PartitionTopStateHandoffDurationGauge.Max";
+ private static final String HANDOFF_USER_LATENCY = "PartitionTopStateHandoffUserLatencyGauge.Max";
+ private static final Range<Long> DURATION_ZERO = Range.closed(0L, 0L);
+ private TestConfig config;
+
+ @BeforeClass
+ public void beforeClass() {
+ super.beforeClass();
+ try {
+ config = OBJECT_MAPPER
+ .readValue(getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE), TestConfig.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static class CurrentStateInfo {
+ String currentState;
+ String previousState;
+ long startTime;
+ long endTime;
+
+ @JsonCreator
+ public CurrentStateInfo(
+ @JsonProperty("CurrentState") String cs,
+ @JsonProperty("PreviousState") String ps,
+ @JsonProperty("StartTime") long start,
+ @JsonProperty("EndTime") long end
+ ) {
+ currentState = cs;
+ previousState = ps;
+ startTime = start;
+ endTime = end;
+ }
+ }
+
+ private static class TestCaseConfig {
+ final Map<String, CurrentStateInfo> initialCurrentStates;
+ final Map<String, CurrentStateInfo> currentStateWithMissingTopState;
+ final Map<String, CurrentStateInfo> finalCurrentState;
+ final long duration;
+ final boolean isGraceful;
+ final long userLatency;
+
+ @JsonCreator
+ public TestCaseConfig(
+ @JsonProperty("InitialCurrentStates") Map<String, CurrentStateInfo> initial,
+ @JsonProperty("MissingTopStates") Map<String, CurrentStateInfo> missing,
+ @JsonProperty("HandoffCurrentStates") Map<String, CurrentStateInfo> handoff,
+ @JsonProperty("Duration") long d,
+ @JsonProperty("UserLatency") long user,
+ @JsonProperty("IsGraceful") boolean graceful
+ ) {
+ initialCurrentStates = initial;
+ currentStateWithMissingTopState = missing;
+ finalCurrentState = handoff;
+ duration = d;
+ userLatency = user;
+ isGraceful = graceful;
+ }
+ }
+
+ private static class TestConfig {
+ final List<TestCaseConfig> succeeded;
+ final List<TestCaseConfig> failed;
+ final List<TestCaseConfig> fast;
+ final List<TestCaseConfig> succeededNonGraceful;
+
+ @JsonCreator
+ public TestConfig(
+ @JsonProperty("succeeded") List<TestCaseConfig> succeededCfg,
+ @JsonProperty("failed") List<TestCaseConfig> failedCfg,
+ @JsonProperty("fast") List<TestCaseConfig> fastCfg,
+ @JsonProperty("succeededNonGraceful") List<TestCaseConfig> nonGraceful
+ ) {
+ succeeded = succeededCfg;
+ failed = failedCfg;
+ fast = fastCfg;
+ succeededNonGraceful = nonGraceful;
+ }
+ }
+
private void preSetup() {
setupLiveInstances(3);
setupStateModel();
@@ -61,165 +142,159 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.name(),
Collections.singletonMap(TEST_RESOURCE, resource));
event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
- CurrentStateComputationStage.NOT_RECORDED);
+ TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED);
ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
monitor.active();
event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
}
@Test(dataProvider = "successCurrentStateInput")
- public void testTopStateSuccessHandoff(Map<String, Map<String, String>> initialCurrentStates,
- Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
- preSetup();
- runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
- Range.closed(expectedDuration, expectedDuration),
- Range.closed(expectedDuration, expectedDuration));
+ public void testTopStateSuccessHandoff(TestCaseConfig cfg) {
+ runTestWithNoInjection(cfg, false);
}
@Test(dataProvider = "fastCurrentStateInput")
- public void testFastTopStateHandoffWithNoMissingTopState(
- Map<String, Map<String, String>> initialCurrentStates,
- Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
- ) {
- preSetup();
- runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
- Range.closed(expectedDuration, expectedDuration),
- Range.closed(expectedDuration, expectedDuration));
+ public void testFastTopStateHandoffWithNoMissingTopState(TestCaseConfig cfg) {
+ runTestWithNoInjection(cfg, false);
}
@Test(dataProvider = "fastCurrentStateInput")
- public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(
- Map<String, Map<String, String>> initialCurrentStates,
- Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
- ) {
+ public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(TestCaseConfig cfg) {
preSetup();
event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), 7500L);
// By simulating last master instance crash, we now have:
// - M->S from 6000 to 7000
// - lastPipelineFinishTimestamp is 7500
// - S->M from 8000 to 9000
- // Therefore the recorded latency should be 9000 - 7500 = 1500
+ // Therefore the recorded latency should be 9000 - 7500 = 1500, though original master crashed,
+ // since this is a single top state handoff observed within 1 pipeline, we treat it as graceful,
+ // and only record user latency for transiting to master
+ Range<Long> expectedDuration = Range.closed(1500L, 1500L);
+ Range<Long> expectedUserLatency = Range.closed(1000L, 1000L);
runStageAndVerify(
- initialCurrentStates, missingTopStates, handOffCurrentStates,
+ cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
new MissingStatesDataCacheInject() {
@Override
public void doInject(ClusterDataCache cache) {
cache.getLiveInstances().remove("localhost_1");
}
}, 1, 0,
- Range.closed(1500L, 1500L),
- Range.closed(1500L, 1500L)
+ expectedDuration,
+ DURATION_ZERO,
+ expectedDuration,
+ expectedUserLatency
);
event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
- CurrentStateComputationStage.NOT_RECORDED);
+ TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED);
}
- @Test(dataProvider = "failedCurrentStateInput")
- public void testTopStateFailedHandoff(Map<String, Map<String, String>> initialCurrentStates,
- Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+ @Test(dataProvider = "succeededNonGraceful")
+ public void testTopStateSuccessfulYetNonGracefulHandoff(TestCaseConfig cfg) {
+ // localhost_0 crashed at 15000
+ // localhost_1 slave -> master started 20000, ended 22000, top state handoff = 7000
preSetup();
- ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
- clusterConfig.setMissTopStateDurationThreshold(5000L);
- setClusterConfig(clusterConfig);
-
- runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 0, 1,
- Range.closed(expectedDuration, expectedDuration),
- Range.closed(expectedDuration, expectedDuration));
- }
-
- // Test handoff that are triggered by an offline master instance
- @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
- public void testTopStateSuccessHandoffWithOfflineNode(
- final Map<String, Map<String, String>> initialCurrentStates,
- final Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
- final long offlineTimeBeforeMasterless = 125;
-
- preSetup();
- long durationToVerify = expectedDuration + offlineTimeBeforeMasterless;
- runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates,
+ final String downInstance = "localhost_0";
+ final Long lastOfflineTime = 15000L;
+ Range<Long> expectedDuration = Range.closed(7000L, 7000L);
+ runStageAndVerify(
+ cfg.initialCurrentStates, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
new MissingStatesDataCacheInject() {
@Override
public void doInject(ClusterDataCache cache) {
- Set<String> topStateNodes = new HashSet<>();
- for (String instance : initialCurrentStates.keySet()) {
- if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
- topStateNodes.add(instance);
- }
- }
- // Simulate the previous top state instance goes offline
- if (!topStateNodes.isEmpty()) {
- cache.getLiveInstances().keySet().removeAll(topStateNodes);
- for (String topStateNode : topStateNodes) {
- long originalStartTime =
- Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
- cache.getInstanceOfflineTimeMap()
- .put(topStateNode, originalStartTime - offlineTimeBeforeMasterless);
- }
- }
+ accessor.removeProperty(accessor.keyBuilder().liveInstance(downInstance));
+ cache.getLiveInstances().remove("localhost_0");
+ cache.getInstanceOfflineTimeMap().put("localhost_0", lastOfflineTime);
+ cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
}
- }, 1, 0, Range.closed(durationToVerify, durationToVerify),
- Range.closed(durationToVerify, durationToVerify));
+ }, 1, 0,
+ DURATION_ZERO, // graceful handoff duration should be 0
+ expectedDuration, // we should have an record for non-graceful handoff
+ expectedDuration, // max handoff should be same as non-graceful handoff
+ DURATION_ZERO // we don't record user latency for non-graceful transition
+ );
+ }
+
+ @Test(dataProvider = "failedCurrentStateInput")
+ public void testTopStateFailedHandoff(TestCaseConfig cfg) {
+ ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
+ clusterConfig.setMissTopStateDurationThreshold(5000L);
+ setClusterConfig(clusterConfig);
+ runTestWithNoInjection(cfg, true);
}
// Test success with no available clue about previous master.
// For example, controller is just changed to a new node.
- @Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
- public void testHandoffDurationWithDefaultStartTime(
- Map<String, Map<String, String>> initialCurrentStates,
- Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+ @Test(
+ dataProvider = "successCurrentStateInput",
+ dependsOnMethods = "testHandoffDurationWithPendingMessage"
+ )
+ public void testHandoffDurationWithDefaultStartTime(final TestCaseConfig cfg) {
preSetup();
- // reset expectedDuration since current system time would be used
- for (Map<String, String> states : handOffCurrentStates.values()) {
- if (states.get("CurrentState").equals("MASTER")) {
- expectedDuration = Long.parseLong(states.get("EndTime")) - System.currentTimeMillis();
+ // No initialCurrentStates means no input can be used as the clue of the previous master.
+ // in such case, reportTopStateMissing will use current system time as missing top state
+ // start time, and we assume it is a graceful handoff, and only "to top state" user latency
+ // will be recorded
+ long userLatency = 0;
+ for (CurrentStateInfo info : cfg.currentStateWithMissingTopState.values()) {
+ if (info.previousState.equals("MASTER")) {
+ userLatency = cfg.userLatency - (info.endTime - info.startTime);
+ }
+ info.previousState = "OFFLINE";
+ }
+ for (CurrentStateInfo states : cfg.finalCurrentState.values()) {
+ if (states.currentState.equals("MASTER")) {
+ states.endTime = System.currentTimeMillis();
+ states.startTime = System.currentTimeMillis() - userLatency;
break;
}
}
-
- // No initialCurrentStates means no input can be used as the clue of the previous master.
- runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates, null, 1, 0,
- Range.atMost(1500L), Range.atMost(1500L));
+ runStageAndVerify(Collections.EMPTY_MAP, cfg.currentStateWithMissingTopState,
+ cfg.finalCurrentState, null, 1, 0,
+ Range.closed(userLatency, userLatency),
+ DURATION_ZERO,
+ Range.closed(userLatency, userLatency),
+ Range.closed(userLatency, userLatency)
+ );
}
/**
* Test success with only a pending message as the clue.
* For instance, if the master was dropped, there is no way to track the dropping time.
* So either use current system time.
- *
* @see org.apache.helix.monitoring.mbeans.TestTopStateHandoffMetrics#testHandoffDurationWithDefaultStartTime
* Or we can check if any pending message to be used as the start time.
*/
@Test(dataProvider = "successCurrentStateInput", dependsOnMethods = "testTopStateSuccessHandoff")
- public void testHandoffDurationWithPendingMessage(
- final Map<String, Map<String, String>> initialCurrentStates,
- final Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
+ public void testHandoffDurationWithPendingMessage(final TestCaseConfig cfg) {
final long messageTimeBeforeMasterless = 145;
preSetup();
- long durationToVerify = expectedDuration + messageTimeBeforeMasterless;
+ long durationToVerify = cfg.duration + messageTimeBeforeMasterless;
+ long userLatency = 0;
+ for (CurrentStateInfo info : cfg.finalCurrentState.values()) {
+ if (info.currentState.equals("MASTER")) {
+ userLatency = info.endTime - info.startTime;
+ }
+ }
+
// No initialCurrentStates means no input can be used as the clue of the previous master.
- runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates,
+ // in this case, we will treat the handoff as graceful and only to-master user latency
+ // will be recorded
+ runStageAndVerify(
+ Collections.EMPTY_MAP, cfg.currentStateWithMissingTopState, cfg.finalCurrentState,
new MissingStatesDataCacheInject() {
@Override public void doInject(ClusterDataCache cache) {
String topStateNode = null;
- for (String instance : initialCurrentStates.keySet()) {
- if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
+ for (String instance : cfg.initialCurrentStates.keySet()) {
+ if (cfg.initialCurrentStates.get(instance).currentState.equals("MASTER")) {
topStateNode = instance;
break;
}
}
// Simulate the previous top state instance goes offline
if (topStateNode != null) {
- long originalStartTime =
- Long.parseLong(missingTopStates.get(topStateNode).get("StartTime"));
+ long originalStartTime = cfg.currentStateWithMissingTopState.get(topStateNode).startTime;
// Inject a message that fit expectedDuration
Message message =
new Message(Message.MessageType.STATE_TRANSITION, "thisisafakemessage");
@@ -232,112 +307,112 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
cache.cacheMessages(Collections.singletonList(message));
}
}
- }, 1, 0, Range.closed(durationToVerify, durationToVerify),
- Range.closed(durationToVerify, durationToVerify));
+ }, 1, 0,
+ Range.closed(durationToVerify, durationToVerify),
+ DURATION_ZERO,
+ Range.closed(durationToVerify, durationToVerify),
+ Range.closed(userLatency, userLatency));
}
- private final String CURRENT_STATE = "CurrentState";
- private final String PREVIOUS_STATE = "PreviousState";
- private final String START_TIME = "StartTime";
- private final String END_TIME = "EndTime";
-
@DataProvider(name = "successCurrentStateInput")
public Object[][] successCurrentState() {
- return loadInputData("succeeded");
+ return testCaseConfigListToObjectArray(config.succeeded);
}
@DataProvider(name = "failedCurrentStateInput")
public Object[][] failedCurrentState() {
- return loadInputData("failed");
+ return testCaseConfigListToObjectArray(config.failed);
}
@DataProvider(name = "fastCurrentStateInput")
public Object[][] fastCurrentState() {
- return loadInputData("fast");
+ return testCaseConfigListToObjectArray(config.fast);
}
- private Object[][] loadInputData(String inputEntry) {
- Object[][] inputData = null;
- InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE);
+ @DataProvider(name = "succeededNonGraceful")
+ public Object[][] nonGracefulCurrentState() {
+ return testCaseConfigListToObjectArray(config.succeededNonGraceful);
+ }
- try {
- ObjectReader mapReader = new ObjectMapper().reader(Map.class);
- Map<String, Object> inputMaps = mapReader.readValue(inputStream);
-
- List<Map<String, Object>> inputs = (List<Map<String, Object>>) inputMaps.get(inputEntry);
- inputData = new Object[inputs.size()][];
- for (int i = 0; i < inputData.length; i++) {
- Map<String, Map<String, String>> intialCurrentStates =
- (Map<String, Map<String, String>>) inputs.get(i).get(INITIAL_CURRENT_STATES);
- Map<String, Map<String, String>> missingTopStates =
- (Map<String, Map<String, String>>) inputs.get(i).get(MISSING_TOP_STATES);
- Map<String, Map<String, String>> handoffCurrentStates =
- (Map<String, Map<String, String>>) inputs.get(i).get(HANDOFF_CURRENT_STATES);
- Long expectedDuration = Long.parseLong((String) inputs.get(i).get(EXPECTED_DURATION));
-
- inputData[i] = new Object[] { intialCurrentStates, missingTopStates, handoffCurrentStates,
- expectedDuration
- };
- }
- } catch (IOException e) {
- e.printStackTrace();
+ private Object[][] testCaseConfigListToObjectArray(List<TestCaseConfig> configs) {
+ Object[][] result = new Object[configs.size()][];
+ for (int i = 0; i < configs.size(); i++) {
+ result[i] = new Object[] {configs.get(i)};
}
+ return result;
+ }
- return inputData;
+ private void runTestWithNoInjection(TestCaseConfig cfg, boolean expectFail) {
+ preSetup();
+ Range<Long> duration = Range.closed(cfg.duration, cfg.duration);
+ Range<Long> expectedDuration = cfg.isGraceful ? duration : DURATION_ZERO;
+ Range<Long> expectedNonGracefulDuration = cfg.isGraceful ? DURATION_ZERO : duration;
+ Range<Long> expectedUserLatency =
+ cfg.isGraceful ? Range.closed(cfg.userLatency, cfg.userLatency) : DURATION_ZERO;
+ runStageAndVerify(cfg.initialCurrentStates, cfg.currentStateWithMissingTopState,
+ cfg.finalCurrentState, null, expectFail ? 0 : 1, expectFail ? 1 : 0, expectedDuration, expectedNonGracefulDuration,
+ expectedDuration, expectedUserLatency);
}
private Map<String, CurrentState> generateCurrentStateMap(
- Map<String, Map<String, String>> currentStateRawData) {
+ Map<String, CurrentStateInfo> currentStateRawData) {
Map<String, CurrentState> currentStateMap = new HashMap<String, CurrentState>();
for (String instanceName : currentStateRawData.keySet()) {
- Map<String, String> propertyMap = currentStateRawData.get(instanceName);
+ CurrentStateInfo info = currentStateRawData.get(instanceName);
CurrentState currentState = new CurrentState(TEST_RESOURCE);
currentState.setSessionId(SESSION_PREFIX + instanceName.split("_")[1]);
- currentState.setState(PARTITION, propertyMap.get(CURRENT_STATE));
- currentState.setPreviousState(PARTITION, propertyMap.get(PREVIOUS_STATE));
- currentState.setStartTime(PARTITION, Long.parseLong(propertyMap.get(START_TIME)));
- currentState.setEndTime(PARTITION, Long.parseLong(propertyMap.get(END_TIME)));
+ currentState.setState(PARTITION, info.currentState);
+ currentState.setPreviousState(PARTITION, info.previousState);
+ currentState.setStartTime(PARTITION, info.startTime);
+ currentState.setEndTime(PARTITION, info.endTime);
currentStateMap.put(instanceName, currentState);
}
return currentStateMap;
}
- private void runCurrentStage(Map<String, Map<String, String>> initialCurrentStates,
- Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates,
+ private void runPipeLine(Map<String, CurrentStateInfo> initialCurrentStates,
+ Map<String, CurrentStateInfo> missingTopStates,
+ Map<String, CurrentStateInfo> handOffCurrentStates,
MissingStatesDataCacheInject testInjection) {
if (initialCurrentStates != null && !initialCurrentStates.isEmpty()) {
- setupCurrentStates(generateCurrentStateMap(initialCurrentStates));
- runStage(event, new ReadClusterDataStage());
- runStage(event, new CurrentStateComputationStage());
+ doRunStages(initialCurrentStates, null);
}
if (missingTopStates != null && !missingTopStates.isEmpty()) {
- setupCurrentStates(generateCurrentStateMap(missingTopStates));
- runStage(event, new ReadClusterDataStage());
- if (testInjection != null) {
- ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
- testInjection.doInject(cache);
- }
- runStage(event, new CurrentStateComputationStage());
+ doRunStages(missingTopStates, testInjection);
}
if (handOffCurrentStates != null && !handOffCurrentStates.isEmpty()) {
- setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
- runStage(event, new ReadClusterDataStage());
- runStage(event, new CurrentStateComputationStage());
+ doRunStages(handOffCurrentStates, null);
+ }
+ }
+
+ private void doRunStages(Map<String, CurrentStateInfo> currentStates,
+ MissingStatesDataCacheInject clusterDataInjection) {
+ setupCurrentStates(generateCurrentStateMap(currentStates));
+ runStage(event, new ReadClusterDataStage());
+ if (clusterDataInjection != null) {
+ ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+ clusterDataInjection.doInject(cache);
}
+ runStage(event, new CurrentStateComputationStage());
+ runStage(event, new TopStateHandoffReportStage());
}
private void runStageAndVerify(
- Map<String, Map<String, String>> initialCurrentStates,
- Map<String, Map<String, String>> missingTopStates,
- Map<String, Map<String, String>> handOffCurrentStates, MissingStatesDataCacheInject inject,
- int successCnt, int failCnt,
- Range<Long> expectedDuration, Range<Long> expectedMaxDuration
+ Map<String, CurrentStateInfo> initialCurrentStates,
+ Map<String, CurrentStateInfo> missingTopStates,
+ Map<String, CurrentStateInfo> handOffCurrentStates,
+ MissingStatesDataCacheInject inject,
+ int successCnt,
+ int failCnt,
+ Range<Long> expectedDuration,
+ Range<Long> expectedNonGracefulDuration,
+ Range<Long> expectedMaxDuration,
+ Range<Long> expectedUserLatency
) {
- runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, inject);
+ runPipeLine(initialCurrentStates, missingTopStates, handOffCurrentStates, inject);
ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
@@ -345,10 +420,17 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), successCnt);
Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), failCnt);
- Assert.assertTrue(
- expectedDuration.contains(monitor.getSuccessfulTopStateHandoffDurationCounter()));
- Assert.assertTrue(
- expectedMaxDuration.contains(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge()));
+ long graceful = monitor.getPartitionTopStateHandoffDurationGauge()
+ .getAttributeValue(GRACEFUL_HANDOFF_DURATION).longValue();
+ long nonGraceful = monitor.getPartitionTopStateNonGracefulHandoffDurationGauge()
+ .getAttributeValue(NON_GRACEFUL_HANDOFF_DURATION).longValue();
+ long user = monitor.getPartitionTopStateHandoffUserLatencyGauge()
+ .getAttributeValue(HANDOFF_USER_LATENCY).longValue();
+ long max = monitor.getMaxSinglePartitionTopStateHandoffDurationGauge();
+ Assert.assertTrue(expectedDuration.contains(graceful));
+ Assert.assertTrue(expectedNonGracefulDuration.contains(nonGraceful));
+ Assert.assertTrue(expectedUserLatency.contains(user));
+ Assert.assertTrue(expectedMaxDuration.contains(max));
}
interface MissingStatesDataCacheInject {
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
index d296e6c..0b39a1d 100644
--- a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
+++ b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
@@ -1,306 +1,371 @@
{
"succeeded": [
{
- "initialCurrentStates": {
+ "InitialCurrentStates": {
"localhost_0": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "10000",
- "EndTime": "11000"
+ "StartTime": 10000,
+ "EndTime": 11000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
"MissingTopStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "15000",
- "EndTime": "18000"
+ "StartTime": 15000,
+ "EndTime": 18000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "handoffCurrentStates": {
+ "HandoffCurrentStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "15000",
- "EndTime": "18000"
+ "StartTime": 15000,
+ "EndTime": 18000
},
"localhost_1": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "20000",
- "EndTime": "22000"
+ "StartTime": 20000,
+ "EndTime": 22000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "expectedDuration": "7000"
+ "Duration": 7000,
+ "UserLatency": 5000,
+ "IsGraceful": true
},
{
- "initialCurrentStates": {
+ "InitialCurrentStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "1000",
- "EndTime": "2000"
+ "StartTime": 1000,
+ "EndTime": 2000
},
"localhost_1": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "2500",
- "EndTime": "5000"
+ "StartTime": 2500,
+ "EndTime": 5000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
"MissingTopStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "1000",
- "EndTime": "2000"
+ "StartTime": 1000,
+ "EndTime": 2000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "handoffCurrentStates": {
+ "HandoffCurrentStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "1000",
- "EndTime": "2000"
+ "StartTime": 1000,
+ "EndTime": 2000
},
"localhost_1": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "20000",
- "EndTime": "22000"
+ "StartTime": 20000,
+ "EndTime": 22000
},
"localhost_2": {
"CurrentState": "ERROR",
"PreviousState": "SLAVE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "expectedDuration": "14000"
+ "Duration": 14000,
+ "UserLatency": 4000,
+ "IsGraceful": true
+ }
+ ],
+ "succeededNonGraceful": [
+ {
+ "InitialCurrentStates": {
+ "localhost_0": {
+ "CurrentState": "MASTER",
+ "PreviousState": "SLAVE",
+ "StartTime": 10000,
+ "EndTime": 11000
+ },
+ "localhost_1": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": 8000,
+ "EndTime": 10000
+ },
+ "localhost_2": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": 8000,
+ "EndTime": 10000
+ }
+ },
+ "MissingTopStates": {
+ "localhost_1": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": 8000,
+ "EndTime": 10000
+ },
+ "localhost_2": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": 8000,
+ "EndTime": 10000
+ }
+ },
+ "HandoffCurrentStates": {
+ "localhost_1": {
+ "CurrentState": "MASTER",
+ "PreviousState": "SLAVE",
+ "StartTime": 20000,
+ "EndTime": 22000
+ },
+ "localhost_2": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": 8000,
+ "EndTime": 10000
+ }
+ },
+ "Duration": 7000,
+ "UserLatency": 5000,
+ "IsGraceful": false
}
],
"failed": [
{
- "initialCurrentStates": {
+ "InitialCurrentStates": {
"localhost_0": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "10000",
- "EndTime": "11000"
+ "StartTime": 10000,
+ "EndTime": 11000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
"MissingTopStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "15000",
- "EndTime": "18000"
+ "StartTime": 15000,
+ "EndTime": 18000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "handoffCurrentStates": {
+ "HandoffCurrentStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "15000",
- "EndTime": "18000"
+ "StartTime": 15000,
+ "EndTime": 18000
},
"localhost_1": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "20000",
- "EndTime": "22000"
+ "StartTime": 20000,
+ "EndTime": 22000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "expectedDuration": "0"
+ "Duration": 0,
+ "UserLatency": 0,
+ "IsGraceful": false
},
{
- "initialCurrentStates": {
+ "InitialCurrentStates": {
"localhost_0": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "10000",
- "EndTime": "11000"
+ "StartTime": 10000,
+ "EndTime": 11000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
"MissingTopStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "15000",
- "EndTime": "18000"
+ "StartTime": 15000,
+ "EndTime": 18000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "handoffCurrentStates": {
+ "HandoffCurrentStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "15000",
- "EndTime": "18000"
+ "StartTime": 15000,
+ "EndTime": 18000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "20000",
- "EndTime": "22000"
+ "StartTime": 20000,
+ "EndTime": 22000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "8000",
- "EndTime": "10000"
+ "StartTime": 8000,
+ "EndTime": 10000
}
},
- "expectedDuration": "0"
+ "Duration": 0,
+ "UserLatency": 0,
+ "IsGraceful": false
}
],
"fast": [
{
- "initialCurrentStates": {
+ "InitialCurrentStates": {
"localhost_0": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "1000",
- "EndTime": "2000"
+ "StartTime": 1000,
+ "EndTime": 2000
},
"localhost_1": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "2500",
- "EndTime": "5000"
+ "StartTime": 2500,
+ "EndTime": 5000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "1000",
- "EndTime": "2000"
+ "StartTime": 1000,
+ "EndTime": 2000
}
},
"MissingTopStates": {
"localhost_0": {
"CurrentState": "MASTER",
"PreviousState": "SLAVE",
- "StartTime": "8000",
- "EndTime": "9000"
+ "StartTime": 8000,
+ "EndTime": 9000
},
"localhost_1": {
"CurrentState": "SLAVE",
"PreviousState": "MASTER",
- "StartTime": "6000",
- "EndTime": "7000"
+ "StartTime": 6000,
+ "EndTime": 7000
},
"localhost_2": {
"CurrentState": "SLAVE",
"PreviousState": "OFFLINE",
- "StartTime": "1000",
- "EndTime": "2000"
+ "StartTime": 1000,
+ "EndTime": 2000
}
},
- "handoffCurrentStates": {
+ "HandoffCurrentStates": {
},
- "expectedDuration": "3000"
+ "Duration": 3000,
+ "UserLatency": 2000,
+ "IsGraceful": true
}
]
}
[2/2] helix git commit: [HELIX-771] More detailed top state handoff
metrics
Posted by jx...@apache.org.
[HELIX-771] More detailed top state handoff metrics
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7e49f995
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7e49f995
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7e49f995
Branch: refs/heads/master
Commit: 7e49f995e29ea200fcc42ce6af148ed521979f5c
Parents: 9d7364d
Author: Harry Zhang <hr...@linkedin.com>
Authored: Tue Oct 30 15:55:20 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Tue Oct 30 15:58:45 2018 -0700
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 3 +-
.../controller/stages/ClusterDataCache.java | 4 +-
.../stages/CurrentStateComputationStage.java | 399 ---------------
.../stages/MissingTopStateRecord.java | 58 +++
.../stages/TaskGarbageCollectionStage.java | 8 +
.../stages/TopStateHandoffReportStage.java | 506 +++++++++++++++++++
.../monitoring/mbeans/ClusterStatusMonitor.java | 37 +-
.../monitoring/mbeans/ResourceMonitor.java | 77 ++-
.../java/org/apache/helix/task/TaskDriver.java | 2 +-
.../task/TestIndependentTaskRebalancer.java | 60 ++-
.../mbeans/TestTopStateHandoffMetrics.java | 418 +++++++++------
.../resources/TestTopStateHandoffMetrics.json | 263 ++++++----
12 files changed, 1103 insertions(+), 732 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 800a331..dd409e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -269,6 +269,7 @@ public class GenericHelixController implements IdealStateChangeListener,
dataPreprocess.addStage(new ResourceComputationStage());
dataPreprocess.addStage(new ResourceValidationStage());
dataPreprocess.addStage(new CurrentStateComputationStage());
+ dataPreprocess.addStage(new TopStateHandoffReportStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline(pipelineName);
@@ -381,7 +382,7 @@ public class GenericHelixController implements IdealStateChangeListener,
_taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
_forceRebalanceTimer = new Timer();
- _lastPipelineEndTimestamp = CurrentStateComputationStage.NOT_RECORDED;
+ _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
initializeAsyncFIFOWorkers();
initPipelines(_eventThread, _cache, false);
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index e1a374e..6de6d51 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -85,7 +85,7 @@ public class ClusterDataCache extends AbstractDataCache {
private Map<String, ResourceConfig> _resourceConfigCacheMap;
private Map<String, ClusterConstraints> _constraintMap;
private Map<String, Map<String, String>> _idealStateRuleMap;
- private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
+ private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap = new HashMap<>();
private Map<String, Map<String, String>> _lastTopStateLocationMap = new HashMap<>();
private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
private Map<String, ExternalView> _externalViewMap = new HashMap<>();
@@ -678,7 +678,7 @@ public class ClusterDataCache extends AbstractDataCache {
return null;
}
- public Map<String, Map<String, Long>> getMissingTopStateMap() {
+ public Map<String, Map<String, MissingTopStateRecord>> getMissingTopStateMap() {
return _missingTopStateMap;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 0713070..d4927ec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
*/
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.controller.LogUtil;
@@ -28,7 +27,6 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.*;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,16 +39,10 @@ import org.slf4j.LoggerFactory;
public class CurrentStateComputationStage extends AbstractBaseStage {
private static Logger LOG = LoggerFactory.getLogger(CurrentStateComputationStage.class);
- public static final long NOT_RECORDED = -1L;
- public static final long TRANSITION_FAILED = -2L;
- public static final String TASK_STATE_MODEL_NAME = "Task";
-
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
- final Long lastPipelineFinishTimestamp = event
- .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), NOT_RECORDED);
final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (cache == null || resourceMap == null) {
@@ -75,14 +67,6 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
}
-
- if (!cache.isTaskCache()) {
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- // TODO Update the status async -- jjwang
- updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
- lastPipelineFinishTimestamp);
- }
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
}
@@ -219,387 +203,4 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
}
}
-
- private void updateTopStateStatus(ClusterDataCache cache,
- ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
- CurrentStateOutput currentStateOutput,
- long lastPipelineFinishTimestamp) {
- Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
- Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-
- long durationThreshold = Long.MAX_VALUE;
- if (cache.getClusterConfig() != null) {
- durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
- }
-
- // Remove any resource records that no longer exists
- missingTopStateMap.keySet().retainAll(resourceMap.keySet());
- lastTopStateMap.keySet().retainAll(resourceMap.keySet());
-
- for (Resource resource : resourceMap.values()) {
- StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
- if (stateModelDef == null || resource.getStateModelDefRef()
- .equalsIgnoreCase(TASK_STATE_MODEL_NAME)) {
- // Resource does not have valid statemodel or it is task state model
- continue;
- }
-
- String resourceName = resource.getResourceName();
-
- for (Partition partition : resource.getPartitions()) {
- String currentTopStateInstance =
- findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
- String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
-
- if (currentTopStateInstance != null) {
- reportTopStateExistance(cache, currentStateOutput, stateModelDef, resourceName, partition,
- lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
- durationThreshold, lastPipelineFinishTimestamp);
- updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
- } else {
- reportTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
- partition, stateModelDef.getTopState(), currentStateOutput);
- reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
- clusterStatusMonitor);
- }
- }
- }
-
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.resetMaxMissingTopStateGauge();
- }
- }
-
- /**
- * From current state output, find out the location of the top state of given resource
- * and partition
- *
- * @param currentStateOutput current state output
- * @param resourceName resource name
- * @param partition partition of the resource
- * @param stateModelDef state model def object
- * @return name of the node that contains top state, null if there is not top state recorded
- */
- private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
- String resourceName, Partition partition, StateModelDefinition stateModelDef) {
- Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
- for (String instance : stateMap.keySet()) {
- if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
- return instance;
- }
- }
- return null;
- }
-
- /**
- * Find cached top state location of the given resource and partition
- *
- * @param cache cluster data cache object
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @return cached name of the node that contains top state, null if not previously cached
- */
- private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
- Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
- return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
- .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
- .get(partition.getPartitionName()) : null;
- }
-
- /**
- * Update top state location cache of the given resource and partition
- *
- * @param cache cluster data cache object
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param currentTopStateInstance name of the instance that currently has the top state
- */
- private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
- Partition partition, String currentTopStateInstance) {
- Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
- if (!lastTopStateMap.containsKey(resourceName)) {
- lastTopStateMap.put(resourceName, new HashMap<String, String>());
- }
- lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
- }
-
- /**
- * When we observe a top state of a given resource and partition, we need to report for the
- * following 2 scenarios:
- * 1. This is a top state come back, i.e. we have a previously missing top state record
- * 2. Top state location change, i.e. current top state location is different from what
- * we saw previously
- *
- * @param cache cluster data cache
- * @param currentStateOutput generated after computiting current state
- * @param stateModelDef State model definition object of the given resource
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param lastTopStateInstance our cached top state location
- * @param currentTopStateInstance top state location we observed during this pipeline run
- * @param clusterStatusMonitor monitor object
- * @param durationThreshold top state handoff duration threshold
- * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
- */
- private void reportTopStateExistance(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
- StateModelDefinition stateModelDef, String resourceName, Partition partition,
- String lastTopStateInstance, String currentTopStateInstance,
- ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
- long lastPipelineFinishTimestamp) {
-
- Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-
- if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
- .containsKey(partition.getPartitionName())) {
- // We previously recorded a top state missing, and it's not coming back
- reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
- missingTopStateMap, resourceName, partition, clusterStatusMonitor, durationThreshold,
- stateModelDef.getTopState());
- } else if (lastTopStateInstance != null && !lastTopStateInstance
- .equals(currentTopStateInstance)) {
- // With no missing top state record, but top state instance changed,
- // we observed an entire top state handoff process
- reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
- resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
- } else {
- // else, there is not top state change, or top state first came up, do nothing
- LogUtil.logDebug(LOG, _eventId, String.format(
- "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
- partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
- }
- }
-
- /**
- * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
- * i.e. current top state instance loaded from ZK is different than the one we cached during
- * last pipeline run.
- *
- * @param cache ClusterDataCache
- * @param lastTopStateInstance Name of last top state instance we cached
- * @param curTopStateInstance Name of current top state instance we refreshed from ZK
- * @param resourceName resource name
- * @param partition partition object
- * @param clusterStatusMonitor cluster state monitor object
- * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
- */
- private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
- String curTopStateInstance, String resourceName, Partition partition,
- ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
- if (curTopStateInstance.equals(lastTopStateInstance)) {
- return;
- }
-
- // Current state output generation logic guarantees that current top state instance
- // must be a live instance
- String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
- long endTime =
- cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
- .getEndTime(partition.getPartitionName());
-
- long startTime = NOT_RECORDED;
- if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
- String lastTopStateSession =
- cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
- // Make sure last top state instance has not bounced during cluster data cache refresh
- // We need this null check as there are test cases creating incomplete current state
- if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
- != null) {
- startTime =
- cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
- .getStartTime(partition.getPartitionName());
- }
- }
- if (startTime == NOT_RECORDED) {
- // either cached last top state instance is no longer alive, or it bounced during cluster
- // data cache refresh, we use last pipeline run end time for best guess. Though we can
- // calculate this number in a more precise way by refreshing data from ZK, given the rarity
- // of this corner case, it's not worthy.
- startTime = lastPipelineFinishTimestamp;
- }
-
- if (startTime == NOT_RECORDED || startTime > endTime) {
- // Top state handoff finished before end of last pipeline run, and instance contains
- // previous top state is no longer alive, so our best guess did not work, ignore the
- // data point for now.
- LogUtil.logWarn(LOG, _eventId, String
- .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
- partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
- return;
- }
-
- LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
- endTime - startTime, partition.getPartitionName()));
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor
- .updateMissingTopStateDurationStats(resourceName, endTime - startTime,
- true);
- }
- }
-
- /**
- * Check if the given partition of the given resource has a missing top state duration larger
- * than the threshold, if so, report a top state transition failure
- *
- * @param cache cluster data cache
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param durationThreshold top state handoff duration threshold
- * @param clusterStatusMonitor monitor object
- */
- private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
- Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
- Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
- String partitionName = partition.getPartitionName();
- Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
- if (startTime != null && startTime > 0
- && System.currentTimeMillis() - startTime > durationThreshold) {
- missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
- }
- }
- }
-
- /**
- * When we find a top state missing of the given partition, we find out when it started to miss
- * top state, then we record it in cache
- *
- * @param cache cluster data cache
- * @param missingTopStateMap missing top state record
- * @param lastTopStateMap our cached last top state locations
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param topState top state name
- * @param currentStateOutput current state output
- */
- private void reportTopStateMissing(ClusterDataCache cache,
- Map<String, Map<String, Long>> missingTopStateMap,
- Map<String, Map<String, String>> lastTopStateMap, String resourceName, Partition partition,
- String topState, CurrentStateOutput currentStateOutput) {
- if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
- .containsKey(partition.getPartitionName())) {
- // a previous missing has been already recorded
- return;
- }
-
- long startTime = NOT_RECORDED;
-
- // 1. try to find the previous topstate missing event for the startTime.
- String missingStateInstance = null;
- if (lastTopStateMap.containsKey(resourceName)) {
- missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
- }
-
- if (missingStateInstance != null) {
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- if (liveInstances.containsKey(missingStateInstance)) {
- CurrentState currentState = cache.getCurrentState(missingStateInstance,
- liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
-
- if (currentState != null
- && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
- .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
- // Update the latest start time only from top state to other state transition
- // At beginning, the start time should -1 (not recorded). If something happen either
- // instance not alive or the instance just started for that partition, Helix does not know
- // the previous start time or end time. So we count from current.
- //
- // Previous state is top state does not mean that resource has only one top state
- // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
- startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
- } // Else no related state transition history found, use current time as the missing start time.
- } else {
- // If the previous topState holder is no longer alive, the offline time is used as start time.
- Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
- if (offlineMap.containsKey(missingStateInstance)) {
- startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
- }
- }
- }
-
- // 2. if no previous topstate records, use any pending message that are created for topstate transition
- if (startTime == NOT_RECORDED) {
- for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
- .values()) {
- // Only messages that match the current session ID will be recorded in the map.
- // So no need to redundantly check here.
- if (message.getToState().equals(topState)) {
- startTime = Math.max(startTime, message.getCreateTimeStamp());
- }
- }
- }
-
- // 3. if no clue about previous topstate or any related pending message, use the current system time.
- if (startTime == NOT_RECORDED) {
- LogUtil.logWarn(LOG, _eventId,
- "Cannot confirm top state missing start time. Use the current system time as the start time.");
- startTime = System.currentTimeMillis();
- }
-
- if (!missingTopStateMap.containsKey(resourceName)) {
- missingTopStateMap.put(resourceName, new HashMap<String, Long>());
- }
-
- Map<String, Long> partitionMap = missingTopStateMap.get(resourceName);
- // Update the new partition without top state
- if (!partitionMap.containsKey(partition.getPartitionName())) {
- partitionMap.put(partition.getPartitionName(), startTime);
- }
- }
-
- /**
- * When we see a top state come back, i.e. we observe a top state in this pipeline run,
- * but have a top state missing record before, we need to remove the top state missing
- * record and report top state handoff duration
- *
- * @param cache cluster data cache
- * @param stateMap state map of the given partition of the given resource
- * @param missingTopStateMap missing top state record
- * @param resourceName resource name
- * @param partition partition of the resource
- * @param clusterStatusMonitor monitor object
- * @param threshold top state handoff threshold
- * @param topState name of the top state
- */
- private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap,
- Map<String, Map<String, Long>> missingTopStateMap, String resourceName, Partition partition,
- ClusterStatusMonitor clusterStatusMonitor, long threshold, String topState) {
-
- long handOffStartTime = missingTopStateMap.get(resourceName).get(partition.getPartitionName());
-
- // Find the earliest end time from the top states
- long handOffEndTime = System.currentTimeMillis();
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- for (String instanceName : stateMap.keySet()) {
- CurrentState currentState =
- cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
- .get(resourceName);
- if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
- handOffEndTime =
- Math.min(handOffEndTime, currentState.getEndTime(partition.getPartitionName()));
- }
- }
-
- if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
- LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
- handOffEndTime - handOffStartTime, partition.getPartitionName()));
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor
- .updateMissingTopStateDurationStats(resourceName, handOffEndTime - handOffStartTime,
- true);
- }
- }
- removeFromStatsMap(missingTopStateMap, resourceName, partition);
- }
-
- private void removeFromStatsMap(Map<String, Map<String, Long>> missingTopStateMap,
- String resourceName, Partition partition) {
- if (missingTopStateMap.containsKey(resourceName)) {
- missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
- }
-
- if (missingTopStateMap.get(resourceName).size() == 0) {
- missingTopStateMap.remove(resourceName);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
new file mode 100644
index 0000000..3aaf326
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
@@ -0,0 +1,58 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A record entry in cluster data cache containing information about a partition's
+ * missing top state
+ */
+public class MissingTopStateRecord {
+ private final boolean isGracefulHandoff;
+ private final long startTimeStamp;
+ private final long userLatency;
+ private boolean failed;
+
+ public MissingTopStateRecord(long start, long user, boolean graceful) {
+ isGracefulHandoff = graceful;
+ startTimeStamp = start;
+ userLatency = user;
+ failed = false;
+ }
+
+ /* package */ boolean isGracefulHandoff() {
+ return isGracefulHandoff;
+ }
+
+ /* package */ long getStartTimeStamp() {
+ return startTimeStamp;
+ }
+
+ /* package */ long getUserLatency() {
+ return userLatency;
+ }
+
+ /* package */ void setFailed() {
+ failed = true;
+ }
+
+ /* package */ boolean isFailed() {
+ return failed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 247d053..4f417fd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -24,6 +24,14 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
public void execute(ClusterEvent event) {
ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+
+ if (clusterDataCache == null || manager == null) {
+ LOG.warn(
+ "ClusterDataCache or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ event.getEventId(), event.getEventType(), event.getClusterName());
+ return;
+ }
+
Set<WorkflowConfig> existingWorkflows =
new HashSet<>(clusterDataCache.getWorkflowConfigMap().values());
for (WorkflowConfig workflowConfig : existingWorkflows) {
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
new file mode 100644
index 0000000..699f2a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
@@ -0,0 +1,506 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Observe top state handoff and report latency
+ * TODO: make this stage async
+ */
+public class TopStateHandoffReportStage extends AbstractBaseStage {
+ private static final long DEFAULT_HANDOFF_USER_LATENCY = 0L;
+ private static Logger LOG = LoggerFactory.getLogger(TopStateHandoffReportStage.class);
+ public static final long TIMESTAMP_NOT_RECORDED = -1L;
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ _eventId = event.getEventId();
+ final ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+ final Long lastPipelineFinishTimestamp = event
+ .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
+ TIMESTAMP_NOT_RECORDED);
+ final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+ final CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+ final ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+
+ if (cache == null || resourceMap == null || currentStateOutput == null) {
+ throw new StageException(
+ "Missing critical attributes for stage, requires ClusterDataCache, RESOURCES and CURRENT_STATE");
+ }
+
+ if (cache.isTaskCache()) {
+ throw new StageException("TopStateHandoffReportStage can only be used in resource pipeline");
+ }
+
+ updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
+ lastPipelineFinishTimestamp);
+
+ }
+
+ private void updateTopStateStatus(ClusterDataCache cache,
+ ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
+ CurrentStateOutput currentStateOutput,
+ long lastPipelineFinishTimestamp) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+
+ long durationThreshold = Long.MAX_VALUE;
+ if (cache.getClusterConfig() != null) {
+ durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
+ }
+
+ // Remove any resource records that no longer exists
+ missingTopStateMap.keySet().retainAll(resourceMap.keySet());
+ lastTopStateMap.keySet().retainAll(resourceMap.keySet());
+
+ for (Resource resource : resourceMap.values()) {
+ StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+ if (stateModelDef == null) {
+ // Resource does not have valid state model, just skip processing
+ continue;
+ }
+
+ String resourceName = resource.getResourceName();
+
+ for (Partition partition : resource.getPartitions()) {
+ String currentTopStateInstance =
+ findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
+ String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
+
+ if (currentTopStateInstance != null) {
+ reportTopStateExistence(cache, currentStateOutput, stateModelDef, resourceName, partition,
+ lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
+ durationThreshold, lastPipelineFinishTimestamp);
+ updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
+ } else {
+ reportTopStateMissing(cache, resourceName,
+ partition, stateModelDef.getTopState(), currentStateOutput);
+ reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
+ clusterStatusMonitor);
+ }
+ }
+ }
+
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.resetMaxMissingTopStateGauge();
+ }
+ }
+
+ /**
+ * From current state output, find out the location of the top state of given resource
+ * and partition
+ *
+ * @param currentStateOutput current state output
+ * @param resourceName resource name
+ * @param partition partition of the resource
+ * @param stateModelDef state model def object
+ * @return name of the node that contains top state, null if there is not top state recorded
+ */
+ private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
+ String resourceName, Partition partition, StateModelDefinition stateModelDef) {
+ Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
+ for (String instance : stateMap.keySet()) {
+ if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
+ return instance;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find cached top state location of the given resource and partition
+ *
+ * @param cache cluster data cache object
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @return cached name of the node that contains top state, null if not previously cached
+ */
+ private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
+ .get(partition.getPartitionName()) : null;
+ }
+
+ /**
+ * Update top state location cache of the given resource and partition
+ *
+ * @param cache cluster data cache object
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param currentTopStateInstance name of the instance that currently has the top state
+ */
+ private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
+ Partition partition, String currentTopStateInstance) {
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ if (!lastTopStateMap.containsKey(resourceName)) {
+ lastTopStateMap.put(resourceName, new HashMap<String, String>());
+ }
+ lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
+ }
+
+ /**
+ * When we observe a top state of a given resource and partition, we need to report for the
+ * following 2 scenarios:
+ * 1. This is a top state come back, i.e. we have a previously missing top state record
+ * 2. Top state location change, i.e. current top state location is different from what
+ * we saw previously
+ *
+ * @param cache cluster data cache
+ * @param currentStateOutput generated after computing current state
+ * @param stateModelDef State model definition object of the given resource
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param lastTopStateInstance our cached top state location
+ * @param currentTopStateInstance top state location we observed during this pipeline run
+ * @param clusterStatusMonitor monitor object
+ * @param durationThreshold top state handoff duration threshold
+ * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
+ */
+ private void reportTopStateExistence(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
+ StateModelDefinition stateModelDef, String resourceName, Partition partition,
+ String lastTopStateInstance, String currentTopStateInstance,
+ ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
+ long lastPipelineFinishTimestamp) {
+
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+
+ if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName())) {
+ // We previously recorded a top state missing, and it's not coming back
+ reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
+ resourceName, partition, clusterStatusMonitor, durationThreshold,
+ stateModelDef.getTopState());
+ } else if (lastTopStateInstance != null && !lastTopStateInstance
+ .equals(currentTopStateInstance)) {
+ // With no missing top state record, but top state instance changed,
+ // we observed an entire top state handoff process
+ reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
+ resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
+ } else {
+ // else, there is not top state change, or top state first came up, do nothing
+ LogUtil.logDebug(LOG, _eventId, String.format(
+ "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
+ partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
+ }
+ }
+
+ /**
+ * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
+ * i.e. current top state instance loaded from ZK is different than the one we cached during
+ * last pipeline run.
+ *
+ * @param cache ClusterDataCache
+ * @param lastTopStateInstance Name of last top state instance we cached
+ * @param curTopStateInstance Name of current top state instance we refreshed from ZK
+ * @param resourceName resource name
+ * @param partition partition object
+ * @param clusterStatusMonitor cluster state monitor object
+ * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
+ */
+ private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
+ String curTopStateInstance, String resourceName, Partition partition,
+ ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
+ if (curTopStateInstance.equals(lastTopStateInstance)) {
+ return;
+ }
+
+ // Current state output generation logic guarantees that current top state instance
+ // must be a live instance
+ String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
+ long endTime =
+ cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+ .getEndTime(partition.getPartitionName());
+ long toTopStateuserLatency =
+ endTime - cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+ .getStartTime(partition.getPartitionName());
+
+ long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+ long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+
+ // Make sure last top state instance has not bounced during cluster data cache refresh
+ if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
+ String lastTopStateSession =
+ cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
+ // We need this null check as there are test cases creating incomplete current state
+ if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ != null) {
+ startTime =
+ cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ .getStartTime(partition.getPartitionName());
+ fromTopStateUserLatency =
+ cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ .getEndTime(partition.getPartitionName()) - startTime;
+ }
+ }
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+ // either cached last top state instance is no longer alive, or it bounced during cluster
+ // data cache refresh, we use last pipeline run end time for best guess. Though we can
+ // calculate this number in a more precise way by refreshing data from ZK, given the rarity
+ // of this corner case, it's not worthy.
+ startTime = lastPipelineFinishTimestamp;
+ fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+ }
+
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED || startTime > endTime) {
+ // Top state handoff finished before end of last pipeline run, and instance contains
+ // previous top state is no longer alive, so our best guess did not work, ignore the
+ // data point for now.
+ LogUtil.logWarn(LOG, _eventId, String
+ .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
+ partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
+ return;
+ }
+
+ long duration = endTime - startTime;
+ long userLatency = fromTopStateUserLatency + toTopStateuserLatency;
+ // We always treat such top state handoff as graceful as if top state handoff is triggered
+ // by instance crash, we cannot observe the entire handoff process within 1 pipeline run
+ logMissingTopStateInfo(duration, userLatency, true, partition.getPartitionName());
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor
+ .updateMissingTopStateDurationStats(resourceName, duration, userLatency, true, true);
+ }
+ }
+
+ /**
+ * Check if the given partition of the given resource has a missing top state duration larger
+ * than the threshold, if so, report a top state transition failure
+ *
+ * @param cache cluster data cache
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param durationThreshold top state handoff duration threshold
+ * @param clusterStatusMonitor monitor object
+ */
+ private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
+ Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+ String partitionName = partition.getPartitionName();
+ MissingTopStateRecord record = missingTopStateMap.get(resourceName).get(partitionName);
+ long startTime = record.getStartTimeStamp();
+ if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold && !record
+ .isFailed()) {
+ record.setFailed();
+ missingTopStateMap.get(resourceName).put(partitionName, record);
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 0L, false, false);
+ }
+ }
+ }
+
+ /**
+ * When we find a top state missing of the given partition, we find out when it started to miss
+ * top state, then we record it in cache
+ *
+ * @param cache cluster data cache
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param topState top state name
+ * @param currentStateOutput current state output
+ */
+ private void reportTopStateMissing(ClusterDataCache cache, String resourceName, Partition partition,
+ String topState, CurrentStateOutput currentStateOutput) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap = cache.getMissingTopStateMap();
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName())) {
+ // a previous missing has been already recorded
+ return;
+ }
+
+ long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+ long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+ boolean isGraceful = true;
+
+ // 1. try to find the previous topstate missing event for the startTime.
+ String missingStateInstance = null;
+ if (lastTopStateMap.containsKey(resourceName)) {
+ missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
+ }
+
+ if (missingStateInstance != null) {
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ if (liveInstances.containsKey(missingStateInstance)) {
+ CurrentState currentState = cache.getCurrentState(missingStateInstance,
+ liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
+
+ if (currentState != null
+ && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
+ .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+
+ // Update the latest start time only from top state to other state transition
+ // At beginning, the start time should -1 (not recorded). If something happen either
+ // instance not alive or the instance just started for that partition, Helix does not know
+ // the previous start time or end time. So we count from current.
+ //
+ // Previous state is top state does not mean that resource has only one top state
+ // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
+ long fromTopStateStartTime = currentState.getStartTime(partition.getPartitionName());
+ if (fromTopStateStartTime > startTime) {
+ startTime = fromTopStateStartTime;
+ fromTopStateUserLatency =
+ currentState.getEndTime(partition.getPartitionName()) - startTime;
+ }
+ startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
+ } // Else no related state transition history found, use current time as the missing start time.
+ } else {
+ // If the previous topState holder is no longer alive, the offline time is used as start time.
+ // Also, if we observe a top state missing and the previous top state node is gone, the
+ // top state handoff is not graceful
+ isGraceful = false;
+ Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
+ if (offlineMap.containsKey(missingStateInstance)) {
+ startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
+ }
+ }
+ }
+
+ // 2. if no previous top state records, it's either resource just created or there is a
+ // controller leadership change. Check any pending message that are created for top state
+ // transition. Assume this is graceful top state handoff as if the from top state instance
+ // crashed, we are not recording such message
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+ for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
+ .values()) {
+ // Only messages that match the current session ID will be recorded in the map.
+ // So no need to redundantly check here.
+ if (message.getToState().equals(topState)) {
+ startTime = Math.max(startTime, message.getCreateTimeStamp());
+ }
+ }
+ }
+
+ // 3. if no clue about previous top state or any related pending message, it could be
+ // a. resource just created
+ // b. controller leader switch (actual hand off could be either graceful or non graceful)
+ //
+ // Use the current system time as missing top state start time and assume always graceful
+ // TODO: revise this case and see if this case can be better addressed
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+ LogUtil.logWarn(LOG, _eventId,
+ "Cannot confirm top state missing start time. Use the current system time as the start time.");
+ startTime = System.currentTimeMillis();
+ }
+
+ if (!missingTopStateMap.containsKey(resourceName)) {
+ missingTopStateMap.put(resourceName, new HashMap<String, MissingTopStateRecord>());
+ }
+
+ missingTopStateMap.get(resourceName).put(partition.getPartitionName(),
+ new MissingTopStateRecord(startTime, fromTopStateUserLatency, isGraceful));
+ }
+
+ /**
+ * When we see a top state come back, i.e. we observe a top state in this pipeline run,
+ * but have a top state missing record before, we need to remove the top state missing
+ * record and report top state handoff duration
+ *
+ * @param cache cluster data cache
+ * @param stateMap state map of the given partition of the given resource
+ * @param resourceName resource name
+ * @param partition partition of the resource
+ * @param clusterStatusMonitor monitor object
+ * @param threshold top state handoff threshold
+ * @param topState name of the top state
+ */
+ private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap, String resourceName,
+ Partition partition, ClusterStatusMonitor clusterStatusMonitor, long threshold,
+ String topState) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+ MissingTopStateRecord record =
+ missingTopStateMap.get(resourceName).get(partition.getPartitionName());
+ long handOffStartTime = record.getStartTimeStamp();
+ long fromTopStateUserLatency = record.getUserLatency();
+
+ // Find the earliest end time from the top states and the corresponding user latency
+ long handOffEndTime = Long.MAX_VALUE;
+ long toTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ for (String instanceName : stateMap.keySet()) {
+ CurrentState currentState =
+ cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
+ .get(resourceName);
+ if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+ if (currentState.getEndTime(partition.getPartitionName()) <= handOffEndTime) {
+ handOffEndTime = currentState.getEndTime(partition.getPartitionName());
+ toTopStateUserLatency =
+ handOffEndTime - currentState.getStartTime(partition.getPartitionName());
+ }
+ }
+ }
+
+ if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
+ long duration = handOffEndTime - handOffStartTime;
+ long userLatency = fromTopStateUserLatency + toTopStateUserLatency;
+ // It is possible that during controller leader switch, we lost previous master information
+ // and use current time to approximate missing top state start time. If we see the actual
+ // user latency is larger than the duration we estimated, we use user latency to start with
+ duration = Math.max(duration, userLatency);
+ boolean isGraceful = record.isGracefulHandoff();
+ logMissingTopStateInfo(duration, userLatency, isGraceful, partition.getPartitionName());
+
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor
+ .updateMissingTopStateDurationStats(resourceName, duration, userLatency, isGraceful,
+ true);
+ }
+ }
+ removeFromStatsMap(missingTopStateMap, resourceName, partition);
+ }
+
+ private void removeFromStatsMap(
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap, String resourceName,
+ Partition partition) {
+ if (missingTopStateMap.containsKey(resourceName)) {
+ missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
+ }
+
+ if (missingTopStateMap.get(resourceName).size() == 0) {
+ missingTopStateMap.remove(resourceName);
+ }
+ }
+
+ private void logMissingTopStateInfo(long totalDuration, long userLatency, boolean isGraceful,
+ String partitionName) {
+ LogUtil.logInfo(LOG, _eventId, String
+ .format("Missing top state duration is %s/%s for partition %s. Graceful: %s", userLatency,
+ totalDuration, partitionName, isGraceful));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index e3655d8..f870ddc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -22,6 +22,21 @@ package org.apache.helix.monitoring.mbeans;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -38,22 +53,6 @@ import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusMonitor.class);
@@ -446,11 +445,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- public synchronized void updateMissingTopStateDurationStats(String resourceName, long duration, boolean succeeded) {
+ public synchronized void updateMissingTopStateDurationStats(String resourceName,
+ long totalDuration, long userLatency, boolean isGraceful, boolean succeeded) {
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
if (resourceMonitor != null) {
- resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, duration, succeeded);
+ resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, totalDuration,
+ userLatency, isGraceful, succeeded);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index a103a0c..c3dd242 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -21,7 +21,15 @@ package org.apache.helix.monitoring.mbeans;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.ObjectName;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -31,10 +39,6 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
-import javax.management.JMException;
-import javax.management.ObjectName;
-import java.util.*;
-
public class ResourceMonitor extends DynamicMBeanProvider {
// Gauges
@@ -60,6 +64,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
// Histograms
private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+ private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge;
+ private HistogramDynamicMetric _partitionTopStateNonGracefulHandoffDurationGauge;
private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private long _lastResetTime;
@@ -86,6 +92,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
attributeList.add(_failedTopStateHandoffCounter);
attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
attributeList.add(_partitionTopStateHandoffDurationGauge);
+ attributeList.add(_partitionTopStateHandoffUserLatencyGauge);
+ attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
attributeList.add(_totalMessageReceived);
attributeList.add(_numPendingStateTransitions);
doRegister(attributeList, _initObjectName);
@@ -96,6 +104,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
TOP_STATE
}
+ @SuppressWarnings("unchecked")
public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
_clusterName = clusterName;
_resourceName = resourceName;
@@ -122,6 +131,14 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_partitionTopStateHandoffDurationGauge =
new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
+ _partitionTopStateHandoffUserLatencyGauge =
+ new HistogramDynamicMetric("PartitionTopStateHandoffUserLatencyGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _partitionTopStateNonGracefulHandoffDurationGauge =
+ new HistogramDynamicMetric("PartitionTopStateNonGracefulHandoffGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
_totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0L);
_maxSinglePartitionTopStateHandoffDuration =
new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L);
@@ -165,6 +182,18 @@ public class ResourceMonitor extends DynamicMBeanProvider {
return _maxSinglePartitionTopStateHandoffDuration.getValue();
}
+ public HistogramDynamicMetric getPartitionTopStateHandoffDurationGauge() {
+ return _partitionTopStateHandoffDurationGauge;
+ }
+
+ public HistogramDynamicMetric getPartitionTopStateNonGracefulHandoffDurationGauge() {
+ return _partitionTopStateNonGracefulHandoffDurationGauge;
+ }
+
+ public HistogramDynamicMetric getPartitionTopStateHandoffUserLatencyGauge() {
+ return _partitionTopStateHandoffUserLatencyGauge;
+ }
+
public long getFailedTopStateHandoffCounter() {
return _failedTopStateHandoffCounter.getValue();
}
@@ -310,25 +339,31 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_numPendingStateTransitions.updateValue((long) messageCount);
}
- public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) {
+ public void updateStateHandoffStats(MonitorState monitorState, long totalDuration,
+ long userLatency, boolean isGraceful, boolean succeeded) {
switch (monitorState) {
- case TOP_STATE:
- if (succeeded) {
- _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
- _successfulTopStateHandoffDurationCounter
- .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration);
- _partitionTopStateHandoffDurationGauge.updateValue(duration);
- if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
- _maxSinglePartitionTopStateHandoffDuration.updateValue(duration);
- _lastResetTime = System.currentTimeMillis();
- }
+ case TOP_STATE:
+ if (succeeded) {
+ _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
+ _successfulTopStateHandoffDurationCounter
+ .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + totalDuration);
+ if (isGraceful) {
+ _partitionTopStateHandoffDurationGauge.updateValue(totalDuration);
+ _partitionTopStateHandoffUserLatencyGauge.updateValue(userLatency);
} else {
- _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+ _partitionTopStateNonGracefulHandoffDurationGauge.updateValue(totalDuration);
}
- break;
- default:
- _logger.warn(
- String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
+ if (totalDuration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
+ _maxSinglePartitionTopStateHandoffDuration.updateValue(totalDuration);
+ _lastResetTime = System.currentTimeMillis();
+ }
+ } else {
+ _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+ }
+ break;
+ default:
+ _logger.warn(
+ String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
}
}
@@ -379,4 +414,4 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_lastResetTime = System.currentTimeMillis();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0225f83..ea529e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -861,7 +861,7 @@ public class TaskDriver {
if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
throw new HelixException(String.format(
"Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
- workflowName, targetStates.toString(),
+ workflowName, Arrays.asList(targetStates),
ctx == null ? "null" : ctx.getWorkflowState().toString()));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 7495078..5509858 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -173,42 +173,56 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
@Test
public void testReassignment() throws Exception {
- final int NUM_INSTANCES = 5;
- String jobName = TestHelper.getTestMethodName();
- Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+ String workflowName = TestHelper.getTestMethodName();
+ String jobNameSuffix = "job";
+ String jobName = String.format("%s_%s", workflowName, jobNameSuffix);
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+
+ // This is error prone as ThreadCountBasedTaskAssigner will always be re-assign
+ // task to same instance given we only have 1 task to assign and the order or
+ // iterating all nodes during assignment is always the same. Rarely some change
+ // will alter the order of iteration debug assignment so we need to change
+ // this instance name to keep on testing this functionality.
+ final String failInstance = "localhost_12919";
Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true,
- "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
+ "failInstance", failInstance));
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
jobCommandMap.put("Timeout", "1000");
- JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
- .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJob(jobName, jobBuilder);
+ // Retry forever
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap).setMaxAttemptsPerTask(Integer.MAX_VALUE);
+ workflowBuilder.addJob(jobNameSuffix, jobBuilder);
_driver.start(workflowBuilder.build());
- // Ensure the job completes
- _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
- _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
+ // Poll to ensure that the gets re-attempted first
+ int trial = 0;
+ while (trial < 1000) { // 100 sec
+ JobContext jctx = _driver.getJobContext(jobName);
+ if (jctx != null && jctx.getPartitionNumAttempts(0) > 1) {
+ break;
+ }
+ Thread.sleep(100);
+ trial += 1;
+ }
+
+ if (trial == 1000) {
+ Assert.fail("Job " + jobName + " is not retried");
+ }
+
+ // disable failed instance
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, false);
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
// Ensure that the class was invoked
Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
-
- // Ensure that this was tried on two different instances, the first of which exhausted the
- // attempts number, and the other passes on the first try -> See below
-
- // TEST FIX: After quota-based scheduling support, we use a different assignment strategy (not
- // consistent hashing), which does not necessarily guarantee that failed tasks will be assigned
- // on a different instance. The parameters for this test are adjusted accordingly
- // Also, hard-coding the instance name (line 184) is not a reliable way of testing whether
- // re-assignment took place, so this test is no longer valid and will always pass
- Assert.assertEquals(_runCounts.size(), 1);
- // Assert.assertTrue(
- // _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
- Assert.assertTrue(_runCounts.values().contains(1));
+ Assert.assertNotSame(_driver.getJobContext(jobName).getAssignedParticipant(0), failInstance);
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, true);
}
@Test