You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/25 23:03:11 UTC
helix git commit: [HELIX-753] Record top state handoff finished in
single cluster data cache refresh
Repository: helix
Updated Branches:
refs/heads/master ca364aecf -> 67ff66b48
[HELIX-753] Record top state handoff finished in single cluster data cache refresh
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/67ff66b4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/67ff66b4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/67ff66b4
Branch: refs/heads/master
Commit: 67ff66b4897309c785b8b42863e95734eba81aab
Parents: ca364ae
Author: Harry Zhang <hr...@linkedin.com>
Authored: Fri Sep 21 14:32:15 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Thu Oct 25 16:02:22 2018 -0700
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 17 +-
.../helix/controller/stages/AttributeName.java | 3 +-
.../helix/controller/stages/ClusterEvent.java | 10 +-
.../stages/CurrentStateComputationStage.java | 293 +++++++++++++++----
.../mbeans/TestTopStateHandoffMetrics.java | 182 ++++++------
.../resources/TestTopStateHandoffMetrics.json | 48 +++
6 files changed, 410 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 6fd4623..b7934b5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -142,6 +142,11 @@ public class GenericHelixController implements IdealStateChangeListener,
private ClusterDataCache _taskCache;
private ScheduledExecutorService _asyncTasksThreadPool;
+ /**
+ * A record of last pipeline finish duration
+ */
+ private long _lastPipelineEndTimestamp;
+
private String _clusterName;
enum PipelineTypes {
@@ -371,6 +376,7 @@ public class GenericHelixController implements IdealStateChangeListener,
_taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
_forceRebalanceTimer = new Timer();
+ _lastPipelineEndTimestamp = CurrentStateComputationStage.NOT_RECORDED;
initializeAsyncFIFOWorkers();
initPipelines(_eventThread, _cache, false);
@@ -461,6 +467,7 @@ public class GenericHelixController implements IdealStateChangeListener,
// add the cache
_cache.setEventId(event.getEventId());
event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+ event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), _lastPipelineEndTimestamp);
List<Pipeline> pipelines = cache.isTaskCache() ?
_taskRegistry.getPipelinesForEvent(event.getEventType()) : _registry
@@ -516,11 +523,12 @@ public class GenericHelixController implements IdealStateChangeListener,
if (!rebalanceFail) {
_continousRebalanceFailureCount = 0;
}
- long endTime = System.currentTimeMillis();
+
+ _lastPipelineEndTimestamp = System.currentTimeMillis();
logger.info(String
.format("END: Invoking %s controller pipeline for event: %s %s for cluster %s, took %d ms",
getPipelineType(cache.isTaskCache()), event.getEventType(), event.getEventId(),
- manager.getClusterName(), (endTime - startTime)));
+ manager.getClusterName(), (_lastPipelineEndTimestamp - startTime)));
if (!cache.isTaskCache()) {
// report event process durations
@@ -546,13 +554,14 @@ public class GenericHelixController implements IdealStateChangeListener,
startTime - enqueueTime);
_clusterStatusMonitor
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
- endTime - startTime);
+ _lastPipelineEndTimestamp - startTime);
}
sb.append(String.format(
"InQueue time for event: " + event.getEventType() + " took: " + (startTime - enqueueTime)
+ " ms\n"));
sb.append(String.format(
- "TotalProcessed time for event: " + event.getEventType() + " took: " + (endTime
+ "TotalProcessed time for event: " + event.getEventType() + " took: " + (
+ _lastPipelineEndTimestamp
- startTime) + " ms"));
logger.info(sb.toString());
} else if (_isMonitoring) {
http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 56bbb44..55f7081 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -37,5 +37,6 @@ public enum AttributeName {
instanceName,
eventData,
AsyncFIFOWorkerPool,
- PipelineType
+ PipelineType,
+ LastRebalanceFinishTimeStamp
}
http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
index e82b0e4..a65e9f0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -93,11 +93,13 @@ public class ClusterEvent {
@SuppressWarnings("unchecked")
public <T extends Object> T getAttribute(String attrName) {
+ return getAttributeWithDefault(attrName, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Object> T getAttributeWithDefault(String attrName, T defaultVal) {
Object ret = _eventAttributeMap.get(attrName);
- if (ret != null) {
- return (T) ret;
- }
- return null;
+ return ret == null ? defaultVal : (T) ret;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 340a051..0713070 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -19,20 +19,19 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.*;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
-import org.apache.helix.controller.LogUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* For each LiveInstances select currentState and message whose sessionId matches
@@ -42,14 +41,16 @@ import java.util.Map;
public class CurrentStateComputationStage extends AbstractBaseStage {
private static Logger LOG = LoggerFactory.getLogger(CurrentStateComputationStage.class);
- public final long NOT_RECORDED = -1L;
- public final long TRANSITION_FAILED = -2L;
- public final String TASK_STATE_MODEL_NAME = "Task";
+ public static final long NOT_RECORDED = -1L;
+ public static final long TRANSITION_FAILED = -2L;
+ public static final String TASK_STATE_MODEL_NAME = "Task";
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+ final Long lastPipelineFinishTimestamp = event
+ .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), NOT_RECORDED);
final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (cache == null || resourceMap == null) {
@@ -79,7 +80,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
// TODO Update the status async -- jjwang
- updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput);
+ updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
+ lastPipelineFinishTimestamp);
}
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
}
@@ -220,7 +222,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
private void updateTopStateStatus(ClusterDataCache cache,
ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
- CurrentStateOutput currentStateOutput) {
+ CurrentStateOutput currentStateOutput,
+ long lastPipelineFinishTimestamp) {
Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
@@ -244,53 +247,232 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
String resourceName = resource.getResourceName();
for (Partition partition : resource.getPartitions()) {
- Map<String, String> stateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
-
- for (String instance : stateMap.keySet()) {
- if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
- if (!lastTopStateMap.containsKey(resourceName)) {
- lastTopStateMap.put(resourceName, new HashMap<String, String>());
- }
- // recording any top state, it is enough for tracking the top state changes.
- lastTopStateMap.get(resourceName).put(partition.getPartitionName(), instance);
- break;
- }
- }
-
- if (stateMap.values().contains(stateModelDef.getTopState())) {
- // Top state comes back
- // The first time participant started or controller switched will be ignored
- reportTopStateComesBack(cache, stateMap, missingTopStateMap, resourceName, partition,
- clusterStatusMonitor, durationThreshold, stateModelDef.getTopState());
+ String currentTopStateInstance =
+ findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
+ String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
+
+ if (currentTopStateInstance != null) {
+ reportTopStateExistance(cache, currentStateOutput, stateModelDef, resourceName, partition,
+ lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
+ durationThreshold, lastPipelineFinishTimestamp);
+ updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
} else {
- // TODO: improve following with MIN_ACTIVE_TOP_STATE logic
- // Missing top state need to record
- reportNewTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
+ reportTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
partition, stateModelDef.getTopState(), currentStateOutput);
+ reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
+ clusterStatusMonitor);
}
}
}
- // Check whether it is already passed threshold
- for (String resourceName : missingTopStateMap.keySet()) {
- for (String partitionName : missingTopStateMap.get(resourceName).keySet()) {
- Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
- if (startTime != null && startTime > 0
- && System.currentTimeMillis() - startTime > durationThreshold) {
- missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
- }
- }
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.resetMaxMissingTopStateGauge();
+ }
+ }
+
+ /**
+ * From current state output, find out the location of the top state of given resource
+ * and partition
+ *
+ * @param currentStateOutput current state output
+ * @param resourceName resource name
+ * @param partition partition of the resource
+ * @param stateModelDef state model def object
+ * @return name of the node that contains top state, null if there is not top state recorded
+ */
+ private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
+ String resourceName, Partition partition, StateModelDefinition stateModelDef) {
+ Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
+ for (String instance : stateMap.keySet()) {
+ if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
+ return instance;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find cached top state location of the given resource and partition
+ *
+ * @param cache cluster data cache object
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @return cached name of the node that contains top state, null if not previously cached
+ */
+ private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
+ .get(partition.getPartitionName()) : null;
+ }
+
+ /**
+ * Update top state location cache of the given resource and partition
+ *
+ * @param cache cluster data cache object
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param currentTopStateInstance name of the instance that currently has the top state
+ */
+ private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
+ Partition partition, String currentTopStateInstance) {
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ if (!lastTopStateMap.containsKey(resourceName)) {
+ lastTopStateMap.put(resourceName, new HashMap<String, String>());
+ }
+ lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
+ }
+
+ /**
+ * When we observe a top state of a given resource and partition, we need to report for the
+ * following 2 scenarios:
+ * 1. This is a top state come back, i.e. we have a previously missing top state record
+ * 2. Top state location change, i.e. current top state location is different from what
+ * we saw previously
+ *
+ * @param cache cluster data cache
+ * @param currentStateOutput generated after computiting current state
+ * @param stateModelDef State model definition object of the given resource
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param lastTopStateInstance our cached top state location
+ * @param currentTopStateInstance top state location we observed during this pipeline run
+ * @param clusterStatusMonitor monitor object
+ * @param durationThreshold top state handoff duration threshold
+ * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
+ */
+ private void reportTopStateExistance(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
+ StateModelDefinition stateModelDef, String resourceName, Partition partition,
+ String lastTopStateInstance, String currentTopStateInstance,
+ ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
+ long lastPipelineFinishTimestamp) {
+
+ Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
+
+ if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName())) {
+ // We previously recorded a top state missing, and it's not coming back
+ reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
+ missingTopStateMap, resourceName, partition, clusterStatusMonitor, durationThreshold,
+ stateModelDef.getTopState());
+ } else if (lastTopStateInstance != null && !lastTopStateInstance
+ .equals(currentTopStateInstance)) {
+ // With no missing top state record, but top state instance changed,
+ // we observed an entire top state handoff process
+ reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
+ resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
+ } else {
+ // else, there is not top state change, or top state first came up, do nothing
+ LogUtil.logDebug(LOG, _eventId, String.format(
+ "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
+ partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
+ }
+ }
+
+ /**
+ * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
+ * i.e. current top state instance loaded from ZK is different than the one we cached during
+ * last pipeline run.
+ *
+ * @param cache ClusterDataCache
+ * @param lastTopStateInstance Name of last top state instance we cached
+ * @param curTopStateInstance Name of current top state instance we refreshed from ZK
+ * @param resourceName resource name
+ * @param partition partition object
+ * @param clusterStatusMonitor cluster state monitor object
+ * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
+ */
+ private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
+ String curTopStateInstance, String resourceName, Partition partition,
+ ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
+ if (curTopStateInstance.equals(lastTopStateInstance)) {
+ return;
+ }
+
+ // Current state output generation logic guarantees that current top state instance
+ // must be a live instance
+ String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
+ long endTime =
+ cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+ .getEndTime(partition.getPartitionName());
+
+ long startTime = NOT_RECORDED;
+ if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
+ String lastTopStateSession =
+ cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
+ // Make sure last top state instance has not bounced during cluster data cache refresh
+ // We need this null check as there are test cases creating incomplete current state
+ if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ != null) {
+ startTime =
+ cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ .getStartTime(partition.getPartitionName());
}
}
+ if (startTime == NOT_RECORDED) {
+ // either cached last top state instance is no longer alive, or it bounced during cluster
+ // data cache refresh, we use last pipeline run end time for best guess. Though we can
+ // calculate this number in a more precise way by refreshing data from ZK, given the rarity
+ // of this corner case, it's not worthy.
+ startTime = lastPipelineFinishTimestamp;
+ }
+
+ if (startTime == NOT_RECORDED || startTime > endTime) {
+ // Top state handoff finished before end of last pipeline run, and instance contains
+ // previous top state is no longer alive, so our best guess did not work, ignore the
+ // data point for now.
+ LogUtil.logWarn(LOG, _eventId, String
+ .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
+ partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
+ return;
+ }
+
+ LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
+ endTime - startTime, partition.getPartitionName()));
if (clusterStatusMonitor != null) {
- clusterStatusMonitor.resetMaxMissingTopStateGauge();
+ clusterStatusMonitor
+ .updateMissingTopStateDurationStats(resourceName, endTime - startTime,
+ true);
+ }
+ }
+
+ /**
+ * Check if the given partition of the given resource has a missing top state duration larger
+ * than the threshold, if so, report a top state transition failure
+ *
+ * @param cache cluster data cache
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param durationThreshold top state handoff duration threshold
+ * @param clusterStatusMonitor monitor object
+ */
+ private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
+ Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
+ Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
+ String partitionName = partition.getPartitionName();
+ Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
+ if (startTime != null && startTime > 0
+ && System.currentTimeMillis() - startTime > durationThreshold) {
+ missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
+ }
}
}
- private void reportNewTopStateMissing(ClusterDataCache cache,
+ /**
+ * When we find a top state missing of the given partition, we find out when it started to miss
+ * top state, then we record it in cache
+ *
+ * @param cache cluster data cache
+ * @param missingTopStateMap missing top state record
+ * @param lastTopStateMap our cached last top state locations
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param topState top state name
+ * @param currentStateOutput current state output
+ */
+ private void reportTopStateMissing(ClusterDataCache cache,
Map<String, Map<String, Long>> missingTopStateMap,
Map<String, Map<String, String>> lastTopStateMap, String resourceName, Partition partition,
String topState, CurrentStateOutput currentStateOutput) {
@@ -365,14 +547,23 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
}
}
+ /**
+ * When we see a top state come back, i.e. we observe a top state in this pipeline run,
+ * but have a top state missing record before, we need to remove the top state missing
+ * record and report top state handoff duration
+ *
+ * @param cache cluster data cache
+ * @param stateMap state map of the given partition of the given resource
+ * @param missingTopStateMap missing top state record
+ * @param resourceName resource name
+ * @param partition partition of the resource
+ * @param clusterStatusMonitor monitor object
+ * @param threshold top state handoff threshold
+ * @param topState name of the top state
+ */
private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap,
Map<String, Map<String, Long>> missingTopStateMap, String resourceName, Partition partition,
ClusterStatusMonitor clusterStatusMonitor, long threshold, String topState) {
- if (!missingTopStateMap.containsKey(resourceName) || !missingTopStateMap.get(resourceName)
- .containsKey(partition.getPartitionName())) {
- // there is no previous missing recorded
- return;
- }
long handOffStartTime = missingTopStateMap.get(resourceName).get(partition.getPartitionName());
http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
index a60d79f..fac957c 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestTopStateHandoffMetrics.java
@@ -19,6 +19,7 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
+import com.google.common.collect.Range;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@@ -51,7 +52,7 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
public final static String TEST_RESOURCE = "TestResource";
public final static String PARTITION = "PARTITION";
- public void preSetup() {
+ private void preSetup() {
setupLiveInstances(3);
setupStateModel();
Resource resource = new Resource(TEST_RESOURCE);
@@ -59,6 +60,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
resource.addPartition(PARTITION);
event.addAttribute(AttributeName.RESOURCES.name(),
Collections.singletonMap(TEST_RESOURCE, resource));
+ event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
+ CurrentStateComputationStage.NOT_RECORDED);
ClusterStatusMonitor monitor = new ClusterStatusMonitor("TestCluster");
monitor.active();
event.addAttribute(AttributeName.clusterStatusMonitor.name(), monitor);
@@ -69,20 +72,49 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
Map<String, Map<String, String>> missingTopStates,
Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
preSetup();
- runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, null);
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
+ runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
+ Range.closed(expectedDuration, expectedDuration),
+ Range.closed(expectedDuration, expectedDuration));
+ }
- // Should have 1 transition succeeded due to threshold.
- Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
- Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
+ @Test(dataProvider = "fastCurrentStateInput")
+ public void testFastTopStateHandoffWithNoMissingTopState(
+ Map<String, Map<String, String>> initialCurrentStates,
+ Map<String, Map<String, String>> missingTopStates,
+ Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
+ ) {
+ preSetup();
+ runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 1, 0,
+ Range.closed(expectedDuration, expectedDuration),
+ Range.closed(expectedDuration, expectedDuration));
+ }
- // Duration should match the expected result
- Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
- (long) expectedDuration);
- Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
- (long) expectedDuration);
+ @Test(dataProvider = "fastCurrentStateInput")
+ public void testFastTopStateHandoffWithNoMissingTopStateAndOldInstanceCrash(
+ Map<String, Map<String, String>> initialCurrentStates,
+ Map<String, Map<String, String>> missingTopStates,
+ Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration
+ ) {
+ preSetup();
+ event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(), 7500L);
+ // By simulating last master instance crash, we now have:
+ // - M->S from 6000 to 7000
+ // - lastPipelineFinishTimestamp is 7500
+ // - S->M from 8000 to 9000
+ // Therefore the recorded latency should be 9000 - 7500 = 1500
+ runStageAndVerify(
+ initialCurrentStates, missingTopStates, handOffCurrentStates,
+ new MissingStatesDataCacheInject() {
+ @Override
+ public void doInject(ClusterDataCache cache) {
+ cache.getLiveInstances().remove("localhost_1");
+ }
+ }, 1, 0,
+ Range.closed(1500L, 1500L),
+ Range.closed(1500L, 1500L)
+ );
+ event.addAttribute(AttributeName.LastRebalanceFinishTimeStamp.name(),
+ CurrentStateComputationStage.NOT_RECORDED);
}
@Test(dataProvider = "failedCurrentStateInput")
@@ -93,20 +125,10 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
ClusterConfig clusterConfig = new ClusterConfig(_clusterName);
clusterConfig.setMissTopStateDurationThreshold(5000L);
setClusterConfig(clusterConfig);
- runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, null);
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
- // Should have 1 transition failed due to threshold.
- Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 0);
- Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 1);
- // No duration updated.
- Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
- (long) expectedDuration);
- Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
- (long) expectedDuration);
+ runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates, null, 0, 1,
+ Range.closed(expectedDuration, expectedDuration),
+ Range.closed(expectedDuration, expectedDuration));
}
// Test handoff that are triggered by an offline master instance
@@ -118,7 +140,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
final long offlineTimeBeforeMasterless = 125;
preSetup();
- runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates,
+ long durationToVerify = expectedDuration + offlineTimeBeforeMasterless;
+ runStageAndVerify(initialCurrentStates, missingTopStates, handOffCurrentStates,
new MissingStatesDataCacheInject() {
@Override
public void doInject(ClusterDataCache cache) {
@@ -139,20 +162,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
}
}
}
- });
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
- // Should have 1 transition succeeded due to threshold.
- Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
- Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
-
- // Duration should match the expected result
- Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
- expectedDuration + offlineTimeBeforeMasterless);
- Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
- expectedDuration + offlineTimeBeforeMasterless);
+ }, 1, 0, Range.closed(durationToVerify, durationToVerify),
+ Range.closed(durationToVerify, durationToVerify));
}
// Test success with no available clue about previous master.
@@ -173,21 +184,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
}
// No initialCurrentStates means no input can be used as the clue of the previous master.
- runCurrentStage(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates, null);
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
- // Should have 1 transition succeeded due to threshold.
- Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
- Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
-
- // Duration should match the expected result.
- // Note that the gap between expectedDuration calculated and monitor calculates the value should be allowed
- Assert.assertTrue(
- expectedDuration - monitor.getSuccessfulTopStateHandoffDurationCounter() < 1500);
- Assert.assertTrue(
- expectedDuration - monitor.getMaxSinglePartitionTopStateHandoffDurationGauge() < 1500);
+ runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates, null, 1, 0,
+ Range.atMost(1500L), Range.atMost(1500L));
}
/**
@@ -205,11 +203,12 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
Map<String, Map<String, String>> handOffCurrentStates, Long expectedDuration) {
final long messageTimeBeforeMasterless = 145;
preSetup();
+
+ long durationToVerify = expectedDuration + messageTimeBeforeMasterless;
// No initialCurrentStates means no input can be used as the clue of the previous master.
- runCurrentStage(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates,
+ runStageAndVerify(Collections.EMPTY_MAP, missingTopStates, handOffCurrentStates,
new MissingStatesDataCacheInject() {
- @Override
- public void doInject(ClusterDataCache cache) {
+ @Override public void doInject(ClusterDataCache cache) {
String topStateNode = null;
for (String instance : initialCurrentStates.keySet()) {
if (initialCurrentStates.get(instance).get("CurrentState").equals("MASTER")) {
@@ -233,21 +232,8 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
cache.cacheMessages(Collections.singletonList(message));
}
}
- });
-
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
-
- // Should have 1 transition succeeded due to threshold.
- Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), 1);
- Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), 0);
-
- // Duration should match the expected result
- Assert.assertEquals(monitor.getSuccessfulTopStateHandoffDurationCounter(),
- expectedDuration + messageTimeBeforeMasterless);
- Assert.assertEquals(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge(),
- expectedDuration + messageTimeBeforeMasterless);
+ }, 1, 0, Range.closed(durationToVerify, durationToVerify),
+ Range.closed(durationToVerify, durationToVerify));
}
private final String CURRENT_STATE = "CurrentState";
@@ -265,6 +251,11 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
return loadInputData("failed");
}
+ @DataProvider(name = "fastCurrentStateInput")
+ public Object[][] fastCurrentState() {
+ return loadInputData("fast");
+ }
+
private Object[][] loadInputData(String inputEntry) {
Object[][] inputData = null;
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(TEST_INPUT_FILE);
@@ -322,17 +313,42 @@ public class TestTopStateHandoffMetrics extends BaseStageTest {
runStage(event, new CurrentStateComputationStage());
}
- setupCurrentStates(generateCurrentStateMap(missingTopStates));
- runStage(event, new ReadClusterDataStage());
- if (testInjection != null) {
- ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
- testInjection.doInject(cache);
+ if (missingTopStates != null && !missingTopStates.isEmpty()) {
+ setupCurrentStates(generateCurrentStateMap(missingTopStates));
+ runStage(event, new ReadClusterDataStage());
+ if (testInjection != null) {
+ ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+ testInjection.doInject(cache);
+ }
+ runStage(event, new CurrentStateComputationStage());
}
- runStage(event, new CurrentStateComputationStage());
- setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
- runStage(event, new ReadClusterDataStage());
- runStage(event, new CurrentStateComputationStage());
+ if (handOffCurrentStates != null && !handOffCurrentStates.isEmpty()) {
+ setupCurrentStates(generateCurrentStateMap(handOffCurrentStates));
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, new CurrentStateComputationStage());
+ }
+ }
+
+ private void runStageAndVerify(
+ Map<String, Map<String, String>> initialCurrentStates,
+ Map<String, Map<String, String>> missingTopStates,
+ Map<String, Map<String, String>> handOffCurrentStates, MissingStatesDataCacheInject inject,
+ int successCnt, int failCnt,
+ Range<Long> expectedDuration, Range<Long> expectedMaxDuration
+ ) {
+ runCurrentStage(initialCurrentStates, missingTopStates, handOffCurrentStates, inject);
+ ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+ ResourceMonitor monitor = clusterStatusMonitor.getResourceMonitor(TEST_RESOURCE);
+
+ Assert.assertEquals(monitor.getSucceededTopStateHandoffCounter(), successCnt);
+ Assert.assertEquals(monitor.getFailedTopStateHandoffCounter(), failCnt);
+
+ Assert.assertTrue(
+ expectedDuration.contains(monitor.getSuccessfulTopStateHandoffDurationCounter()));
+ Assert.assertTrue(
+ expectedMaxDuration.contains(monitor.getMaxSinglePartitionTopStateHandoffDurationGauge()));
}
interface MissingStatesDataCacheInject {
http://git-wip-us.apache.org/repos/asf/helix/blob/67ff66b4/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
----------------------------------------------------------------------
diff --git a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
index 41393ea..d296e6c 100644
--- a/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
+++ b/helix-core/src/test/resources/TestTopStateHandoffMetrics.json
@@ -254,5 +254,53 @@
},
"expectedDuration": "0"
}
+ ],
+ "fast": [
+ {
+ "initialCurrentStates": {
+ "localhost_0": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": "1000",
+ "EndTime": "2000"
+ },
+ "localhost_1": {
+ "CurrentState": "MASTER",
+ "PreviousState": "SLAVE",
+ "StartTime": "2500",
+ "EndTime": "5000"
+ },
+ "localhost_2": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": "1000",
+ "EndTime": "2000"
+ }
+ },
+ "MissingTopStates": {
+ "localhost_0": {
+ "CurrentState": "MASTER",
+ "PreviousState": "SLAVE",
+ "StartTime": "8000",
+ "EndTime": "9000"
+ },
+ "localhost_1": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "MASTER",
+ "StartTime": "6000",
+ "EndTime": "7000"
+ },
+ "localhost_2": {
+ "CurrentState": "SLAVE",
+ "PreviousState": "OFFLINE",
+ "StartTime": "1000",
+ "EndTime": "2000"
+ }
+ },
+ "handoffCurrentStates": {
+
+ },
+ "expectedDuration": "3000"
+ }
]
}