You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 23:08:13 UTC
[2/2] helix git commit: [HELIX-771] More detailed top state handoff
metrics
[HELIX-771] More detailed top state handoff metrics
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7e49f995
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7e49f995
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7e49f995
Branch: refs/heads/master
Commit: 7e49f995e29ea200fcc42ce6af148ed521979f5c
Parents: 9d7364d
Author: Harry Zhang <hr...@linkedin.com>
Authored: Tue Oct 30 15:55:20 2018 -0700
Committer: Harry Zhang <hr...@linkedin.com>
Committed: Tue Oct 30 15:58:45 2018 -0700
----------------------------------------------------------------------
.../controller/GenericHelixController.java | 3 +-
.../controller/stages/ClusterDataCache.java | 4 +-
.../stages/CurrentStateComputationStage.java | 399 ---------------
.../stages/MissingTopStateRecord.java | 58 +++
.../stages/TaskGarbageCollectionStage.java | 8 +
.../stages/TopStateHandoffReportStage.java | 506 +++++++++++++++++++
.../monitoring/mbeans/ClusterStatusMonitor.java | 37 +-
.../monitoring/mbeans/ResourceMonitor.java | 77 ++-
.../java/org/apache/helix/task/TaskDriver.java | 2 +-
.../task/TestIndependentTaskRebalancer.java | 60 ++-
.../mbeans/TestTopStateHandoffMetrics.java | 418 +++++++++------
.../resources/TestTopStateHandoffMetrics.json | 263 ++++++----
12 files changed, 1103 insertions(+), 732 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 800a331..dd409e5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -269,6 +269,7 @@ public class GenericHelixController implements IdealStateChangeListener,
dataPreprocess.addStage(new ResourceComputationStage());
dataPreprocess.addStage(new ResourceValidationStage());
dataPreprocess.addStage(new CurrentStateComputationStage());
+ dataPreprocess.addStage(new TopStateHandoffReportStage());
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline(pipelineName);
@@ -381,7 +382,7 @@ public class GenericHelixController implements IdealStateChangeListener,
_taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
_forceRebalanceTimer = new Timer();
- _lastPipelineEndTimestamp = CurrentStateComputationStage.NOT_RECORDED;
+ _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
initializeAsyncFIFOWorkers();
initPipelines(_eventThread, _cache, false);
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index e1a374e..6de6d51 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -85,7 +85,7 @@ public class ClusterDataCache extends AbstractDataCache {
private Map<String, ResourceConfig> _resourceConfigCacheMap;
private Map<String, ClusterConstraints> _constraintMap;
private Map<String, Map<String, String>> _idealStateRuleMap;
- private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>();
+ private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap = new HashMap<>();
private Map<String, Map<String, String>> _lastTopStateLocationMap = new HashMap<>();
private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>();
private Map<String, ExternalView> _externalViewMap = new HashMap<>();
@@ -678,7 +678,7 @@ public class ClusterDataCache extends AbstractDataCache {
return null;
}
- public Map<String, Map<String, Long>> getMissingTopStateMap() {
+ public Map<String, Map<String, MissingTopStateRecord>> getMissingTopStateMap() {
return _missingTopStateMap;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 0713070..d4927ec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -20,7 +20,6 @@ package org.apache.helix.controller.stages;
*/
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.controller.LogUtil;
@@ -28,7 +27,6 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.*;
import org.apache.helix.model.Message.MessageType;
-import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,16 +39,10 @@ import org.slf4j.LoggerFactory;
public class CurrentStateComputationStage extends AbstractBaseStage {
private static Logger LOG = LoggerFactory.getLogger(CurrentStateComputationStage.class);
- public static final long NOT_RECORDED = -1L;
- public static final long TRANSITION_FAILED = -2L;
- public static final String TASK_STATE_MODEL_NAME = "Task";
-
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
- final Long lastPipelineFinishTimestamp = event
- .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(), NOT_RECORDED);
final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (cache == null || resourceMap == null) {
@@ -75,14 +67,6 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(), currentStateOutput, resourceMap);
}
-
- if (!cache.isTaskCache()) {
- ClusterStatusMonitor clusterStatusMonitor =
- event.getAttribute(AttributeName.clusterStatusMonitor.name());
- // TODO Update the status async -- jjwang
- updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
- lastPipelineFinishTimestamp);
- }
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
}
@@ -219,387 +203,4 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
currentStateOutput.setCancellationMessage(resourceName, partition, instanceName, message);
}
}
-
- private void updateTopStateStatus(ClusterDataCache cache,
- ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
- CurrentStateOutput currentStateOutput,
- long lastPipelineFinishTimestamp) {
- Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
- Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
-
- long durationThreshold = Long.MAX_VALUE;
- if (cache.getClusterConfig() != null) {
- durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
- }
-
- // Remove any resource records that no longer exists
- missingTopStateMap.keySet().retainAll(resourceMap.keySet());
- lastTopStateMap.keySet().retainAll(resourceMap.keySet());
-
- for (Resource resource : resourceMap.values()) {
- StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
- if (stateModelDef == null || resource.getStateModelDefRef()
- .equalsIgnoreCase(TASK_STATE_MODEL_NAME)) {
- // Resource does not have valid statemodel or it is task state model
- continue;
- }
-
- String resourceName = resource.getResourceName();
-
- for (Partition partition : resource.getPartitions()) {
- String currentTopStateInstance =
- findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
- String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
-
- if (currentTopStateInstance != null) {
- reportTopStateExistance(cache, currentStateOutput, stateModelDef, resourceName, partition,
- lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
- durationThreshold, lastPipelineFinishTimestamp);
- updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
- } else {
- reportTopStateMissing(cache, missingTopStateMap, lastTopStateMap, resourceName,
- partition, stateModelDef.getTopState(), currentStateOutput);
- reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
- clusterStatusMonitor);
- }
- }
- }
-
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.resetMaxMissingTopStateGauge();
- }
- }
-
- /**
- * From current state output, find out the location of the top state of given resource
- * and partition
- *
- * @param currentStateOutput current state output
- * @param resourceName resource name
- * @param partition partition of the resource
- * @param stateModelDef state model def object
- * @return name of the node that contains top state, null if there is not top state recorded
- */
- private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
- String resourceName, Partition partition, StateModelDefinition stateModelDef) {
- Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
- for (String instance : stateMap.keySet()) {
- if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
- return instance;
- }
- }
- return null;
- }
-
- /**
- * Find cached top state location of the given resource and partition
- *
- * @param cache cluster data cache object
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @return cached name of the node that contains top state, null if not previously cached
- */
- private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
- Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
- return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
- .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
- .get(partition.getPartitionName()) : null;
- }
-
- /**
- * Update top state location cache of the given resource and partition
- *
- * @param cache cluster data cache object
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param currentTopStateInstance name of the instance that currently has the top state
- */
- private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
- Partition partition, String currentTopStateInstance) {
- Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
- if (!lastTopStateMap.containsKey(resourceName)) {
- lastTopStateMap.put(resourceName, new HashMap<String, String>());
- }
- lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
- }
-
- /**
- * When we observe a top state of a given resource and partition, we need to report for the
- * following 2 scenarios:
- * 1. This is a top state come back, i.e. we have a previously missing top state record
- * 2. Top state location change, i.e. current top state location is different from what
- * we saw previously
- *
- * @param cache cluster data cache
- * @param currentStateOutput generated after computiting current state
- * @param stateModelDef State model definition object of the given resource
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param lastTopStateInstance our cached top state location
- * @param currentTopStateInstance top state location we observed during this pipeline run
- * @param clusterStatusMonitor monitor object
- * @param durationThreshold top state handoff duration threshold
- * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
- */
- private void reportTopStateExistance(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
- StateModelDefinition stateModelDef, String resourceName, Partition partition,
- String lastTopStateInstance, String currentTopStateInstance,
- ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
- long lastPipelineFinishTimestamp) {
-
- Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
-
- if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
- .containsKey(partition.getPartitionName())) {
- // We previously recorded a top state missing, and it's not coming back
- reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
- missingTopStateMap, resourceName, partition, clusterStatusMonitor, durationThreshold,
- stateModelDef.getTopState());
- } else if (lastTopStateInstance != null && !lastTopStateInstance
- .equals(currentTopStateInstance)) {
- // With no missing top state record, but top state instance changed,
- // we observed an entire top state handoff process
- reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
- resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
- } else {
- // else, there is not top state change, or top state first came up, do nothing
- LogUtil.logDebug(LOG, _eventId, String.format(
- "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
- partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
- }
- }
-
- /**
- * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
- * i.e. current top state instance loaded from ZK is different than the one we cached during
- * last pipeline run.
- *
- * @param cache ClusterDataCache
- * @param lastTopStateInstance Name of last top state instance we cached
- * @param curTopStateInstance Name of current top state instance we refreshed from ZK
- * @param resourceName resource name
- * @param partition partition object
- * @param clusterStatusMonitor cluster state monitor object
- * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
- */
- private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
- String curTopStateInstance, String resourceName, Partition partition,
- ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
- if (curTopStateInstance.equals(lastTopStateInstance)) {
- return;
- }
-
- // Current state output generation logic guarantees that current top state instance
- // must be a live instance
- String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
- long endTime =
- cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
- .getEndTime(partition.getPartitionName());
-
- long startTime = NOT_RECORDED;
- if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
- String lastTopStateSession =
- cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
- // Make sure last top state instance has not bounced during cluster data cache refresh
- // We need this null check as there are test cases creating incomplete current state
- if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
- != null) {
- startTime =
- cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
- .getStartTime(partition.getPartitionName());
- }
- }
- if (startTime == NOT_RECORDED) {
- // either cached last top state instance is no longer alive, or it bounced during cluster
- // data cache refresh, we use last pipeline run end time for best guess. Though we can
- // calculate this number in a more precise way by refreshing data from ZK, given the rarity
- // of this corner case, it's not worthy.
- startTime = lastPipelineFinishTimestamp;
- }
-
- if (startTime == NOT_RECORDED || startTime > endTime) {
- // Top state handoff finished before end of last pipeline run, and instance contains
- // previous top state is no longer alive, so our best guess did not work, ignore the
- // data point for now.
- LogUtil.logWarn(LOG, _eventId, String
- .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
- partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
- return;
- }
-
- LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
- endTime - startTime, partition.getPartitionName()));
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor
- .updateMissingTopStateDurationStats(resourceName, endTime - startTime,
- true);
- }
- }
-
- /**
- * Check if the given partition of the given resource has a missing top state duration larger
- * than the threshold, if so, report a top state transition failure
- *
- * @param cache cluster data cache
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param durationThreshold top state handoff duration threshold
- * @param clusterStatusMonitor monitor object
- */
- private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
- Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
- Map<String, Map<String, Long>> missingTopStateMap = cache.getMissingTopStateMap();
- String partitionName = partition.getPartitionName();
- Long startTime = missingTopStateMap.get(resourceName).get(partitionName);
- if (startTime != null && startTime > 0
- && System.currentTimeMillis() - startTime > durationThreshold) {
- missingTopStateMap.get(resourceName).put(partitionName, TRANSITION_FAILED);
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, false);
- }
- }
- }
-
- /**
- * When we find a top state missing of the given partition, we find out when it started to miss
- * top state, then we record it in cache
- *
- * @param cache cluster data cache
- * @param missingTopStateMap missing top state record
- * @param lastTopStateMap our cached last top state locations
- * @param resourceName resource name
- * @param partition partition of the given resource
- * @param topState top state name
- * @param currentStateOutput current state output
- */
- private void reportTopStateMissing(ClusterDataCache cache,
- Map<String, Map<String, Long>> missingTopStateMap,
- Map<String, Map<String, String>> lastTopStateMap, String resourceName, Partition partition,
- String topState, CurrentStateOutput currentStateOutput) {
- if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
- .containsKey(partition.getPartitionName())) {
- // a previous missing has been already recorded
- return;
- }
-
- long startTime = NOT_RECORDED;
-
- // 1. try to find the previous topstate missing event for the startTime.
- String missingStateInstance = null;
- if (lastTopStateMap.containsKey(resourceName)) {
- missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
- }
-
- if (missingStateInstance != null) {
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- if (liveInstances.containsKey(missingStateInstance)) {
- CurrentState currentState = cache.getCurrentState(missingStateInstance,
- liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
-
- if (currentState != null
- && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
- .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
- // Update the latest start time only from top state to other state transition
- // At beginning, the start time should -1 (not recorded). If something happen either
- // instance not alive or the instance just started for that partition, Helix does not know
- // the previous start time or end time. So we count from current.
- //
- // Previous state is top state does not mean that resource has only one top state
- // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
- startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
- } // Else no related state transition history found, use current time as the missing start time.
- } else {
- // If the previous topState holder is no longer alive, the offline time is used as start time.
- Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
- if (offlineMap.containsKey(missingStateInstance)) {
- startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
- }
- }
- }
-
- // 2. if no previous topstate records, use any pending message that are created for topstate transition
- if (startTime == NOT_RECORDED) {
- for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
- .values()) {
- // Only messages that match the current session ID will be recorded in the map.
- // So no need to redundantly check here.
- if (message.getToState().equals(topState)) {
- startTime = Math.max(startTime, message.getCreateTimeStamp());
- }
- }
- }
-
- // 3. if no clue about previous topstate or any related pending message, use the current system time.
- if (startTime == NOT_RECORDED) {
- LogUtil.logWarn(LOG, _eventId,
- "Cannot confirm top state missing start time. Use the current system time as the start time.");
- startTime = System.currentTimeMillis();
- }
-
- if (!missingTopStateMap.containsKey(resourceName)) {
- missingTopStateMap.put(resourceName, new HashMap<String, Long>());
- }
-
- Map<String, Long> partitionMap = missingTopStateMap.get(resourceName);
- // Update the new partition without top state
- if (!partitionMap.containsKey(partition.getPartitionName())) {
- partitionMap.put(partition.getPartitionName(), startTime);
- }
- }
-
- /**
- * When we see a top state come back, i.e. we observe a top state in this pipeline run,
- * but have a top state missing record before, we need to remove the top state missing
- * record and report top state handoff duration
- *
- * @param cache cluster data cache
- * @param stateMap state map of the given partition of the given resource
- * @param missingTopStateMap missing top state record
- * @param resourceName resource name
- * @param partition partition of the resource
- * @param clusterStatusMonitor monitor object
- * @param threshold top state handoff threshold
- * @param topState name of the top state
- */
- private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap,
- Map<String, Map<String, Long>> missingTopStateMap, String resourceName, Partition partition,
- ClusterStatusMonitor clusterStatusMonitor, long threshold, String topState) {
-
- long handOffStartTime = missingTopStateMap.get(resourceName).get(partition.getPartitionName());
-
- // Find the earliest end time from the top states
- long handOffEndTime = System.currentTimeMillis();
- Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
- for (String instanceName : stateMap.keySet()) {
- CurrentState currentState =
- cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
- .get(resourceName);
- if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
- handOffEndTime =
- Math.min(handOffEndTime, currentState.getEndTime(partition.getPartitionName()));
- }
- }
-
- if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
- LogUtil.logInfo(LOG, _eventId, String.format("Missing topstate duration is %d for partition %s",
- handOffEndTime - handOffStartTime, partition.getPartitionName()));
- if (clusterStatusMonitor != null) {
- clusterStatusMonitor
- .updateMissingTopStateDurationStats(resourceName, handOffEndTime - handOffStartTime,
- true);
- }
- }
- removeFromStatsMap(missingTopStateMap, resourceName, partition);
- }
-
- private void removeFromStatsMap(Map<String, Map<String, Long>> missingTopStateMap,
- String resourceName, Partition partition) {
- if (missingTopStateMap.containsKey(resourceName)) {
- missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
- }
-
- if (missingTopStateMap.get(resourceName).size() == 0) {
- missingTopStateMap.remove(resourceName);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
new file mode 100644
index 0000000..3aaf326
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MissingTopStateRecord.java
@@ -0,0 +1,58 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A record entry in cluster data cache containing information about a partition's
+ * missing top state
+ */
+public class MissingTopStateRecord {
+ private final boolean isGracefulHandoff;
+ private final long startTimeStamp;
+ private final long userLatency;
+ private boolean failed;
+
+ public MissingTopStateRecord(long start, long user, boolean graceful) {
+ isGracefulHandoff = graceful;
+ startTimeStamp = start;
+ userLatency = user;
+ failed = false;
+ }
+
+ /* package */ boolean isGracefulHandoff() {
+ return isGracefulHandoff;
+ }
+
+ /* package */ long getStartTimeStamp() {
+ return startTimeStamp;
+ }
+
+ /* package */ long getUserLatency() {
+ return userLatency;
+ }
+
+ /* package */ void setFailed() {
+ failed = true;
+ }
+
+ /* package */ boolean isFailed() {
+ return failed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
index 247d053..4f417fd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskGarbageCollectionStage.java
@@ -24,6 +24,14 @@ public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
public void execute(ClusterEvent event) {
ClusterDataCache clusterDataCache = event.getAttribute(AttributeName.ClusterDataCache.name());
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
+
+ if (clusterDataCache == null || manager == null) {
+ LOG.warn(
+ "ClusterDataCache or HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.",
+ event.getEventId(), event.getEventType(), event.getClusterName());
+ return;
+ }
+
Set<WorkflowConfig> existingWorkflows =
new HashSet<>(clusterDataCache.getWorkflowConfigMap().values());
for (WorkflowConfig workflowConfig : existingWorkflows) {
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
new file mode 100644
index 0000000..699f2a8
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TopStateHandoffReportStage.java
@@ -0,0 +1,506 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Observe top state handoff and report latency
+ * TODO: make this stage async
+ */
+public class TopStateHandoffReportStage extends AbstractBaseStage {
+ private static final long DEFAULT_HANDOFF_USER_LATENCY = 0L;
+ private static Logger LOG = LoggerFactory.getLogger(TopStateHandoffReportStage.class);
+ public static final long TIMESTAMP_NOT_RECORDED = -1L;
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ _eventId = event.getEventId();
+ final ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
+ final Long lastPipelineFinishTimestamp = event
+ .getAttributeWithDefault(AttributeName.LastRebalanceFinishTimeStamp.name(),
+ TIMESTAMP_NOT_RECORDED);
+ final Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
+ final CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+ final ClusterStatusMonitor clusterStatusMonitor =
+ event.getAttribute(AttributeName.clusterStatusMonitor.name());
+
+ if (cache == null || resourceMap == null || currentStateOutput == null) {
+ throw new StageException(
+ "Missing critical attributes for stage, requires ClusterDataCache, RESOURCES and CURRENT_STATE");
+ }
+
+ if (cache.isTaskCache()) {
+ throw new StageException("TopStateHandoffReportStage can only be used in resource pipeline");
+ }
+
+ updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput,
+ lastPipelineFinishTimestamp);
+
+ }
+
+ private void updateTopStateStatus(ClusterDataCache cache,
+ ClusterStatusMonitor clusterStatusMonitor, Map<String, Resource> resourceMap,
+ CurrentStateOutput currentStateOutput,
+ long lastPipelineFinishTimestamp) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+
+ long durationThreshold = Long.MAX_VALUE;
+ if (cache.getClusterConfig() != null) {
+ durationThreshold = cache.getClusterConfig().getMissTopStateDurationThreshold();
+ }
+
+ // Remove any resource records that no longer exists
+ missingTopStateMap.keySet().retainAll(resourceMap.keySet());
+ lastTopStateMap.keySet().retainAll(resourceMap.keySet());
+
+ for (Resource resource : resourceMap.values()) {
+ StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
+ if (stateModelDef == null) {
+ // Resource does not have valid state model, just skip processing
+ continue;
+ }
+
+ String resourceName = resource.getResourceName();
+
+ for (Partition partition : resource.getPartitions()) {
+ String currentTopStateInstance =
+ findCurrentTopStateLocation(currentStateOutput, resourceName, partition, stateModelDef);
+ String lastTopStateInstance = findCachedTopStateLocation(cache, resourceName, partition);
+
+ if (currentTopStateInstance != null) {
+ reportTopStateExistence(cache, currentStateOutput, stateModelDef, resourceName, partition,
+ lastTopStateInstance, currentTopStateInstance, clusterStatusMonitor,
+ durationThreshold, lastPipelineFinishTimestamp);
+ updateCachedTopStateLocation(cache, resourceName, partition, currentTopStateInstance);
+ } else {
+ reportTopStateMissing(cache, resourceName,
+ partition, stateModelDef.getTopState(), currentStateOutput);
+ reportTopStateHandoffFailIfNecessary(cache, resourceName, partition, durationThreshold,
+ clusterStatusMonitor);
+ }
+ }
+ }
+
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.resetMaxMissingTopStateGauge();
+ }
+ }
+
+ /**
+ * From current state output, find out the location of the top state of given resource
+ * and partition
+ *
+ * @param currentStateOutput current state output
+ * @param resourceName resource name
+ * @param partition partition of the resource
+ * @param stateModelDef state model def object
+ * @return name of the node that contains top state, null if there is not top state recorded
+ */
+ private String findCurrentTopStateLocation(CurrentStateOutput currentStateOutput,
+ String resourceName, Partition partition, StateModelDefinition stateModelDef) {
+ Map<String, String> stateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
+ for (String instance : stateMap.keySet()) {
+ if (stateMap.get(instance).equals(stateModelDef.getTopState())) {
+ return instance;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find cached top state location of the given resource and partition
+ *
+ * @param cache cluster data cache object
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @return cached name of the node that contains top state, null if not previously cached
+ */
+ private String findCachedTopStateLocation(ClusterDataCache cache, String resourceName, Partition partition) {
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ return lastTopStateMap.containsKey(resourceName) && lastTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName()) ? lastTopStateMap.get(resourceName)
+ .get(partition.getPartitionName()) : null;
+ }
+
+ /**
+ * Update top state location cache of the given resource and partition
+ *
+ * @param cache cluster data cache object
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param currentTopStateInstance name of the instance that currently has the top state
+ */
+ private void updateCachedTopStateLocation(ClusterDataCache cache, String resourceName,
+ Partition partition, String currentTopStateInstance) {
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ if (!lastTopStateMap.containsKey(resourceName)) {
+ lastTopStateMap.put(resourceName, new HashMap<String, String>());
+ }
+ lastTopStateMap.get(resourceName).put(partition.getPartitionName(), currentTopStateInstance);
+ }
+
+ /**
+ * When we observe a top state of a given resource and partition, we need to report for the
+ * following 2 scenarios:
+ * 1. This is a top state come back, i.e. we have a previously missing top state record
+ * 2. Top state location change, i.e. current top state location is different from what
+ * we saw previously
+ *
+ * @param cache cluster data cache
+ * @param currentStateOutput generated after computing current state
+ * @param stateModelDef State model definition object of the given resource
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param lastTopStateInstance our cached top state location
+ * @param currentTopStateInstance top state location we observed during this pipeline run
+ * @param clusterStatusMonitor monitor object
+ * @param durationThreshold top state handoff duration threshold
+ * @param lastPipelineFinishTimestamp timestamp when last pipeline run finished
+ */
+ private void reportTopStateExistence(ClusterDataCache cache, CurrentStateOutput currentStateOutput,
+ StateModelDefinition stateModelDef, String resourceName, Partition partition,
+ String lastTopStateInstance, String currentTopStateInstance,
+ ClusterStatusMonitor clusterStatusMonitor, long durationThreshold,
+ long lastPipelineFinishTimestamp) {
+
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+
+ if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName())) {
+ // We previously recorded a top state missing, and it's not coming back
+ reportTopStateComesBack(cache, currentStateOutput.getCurrentStateMap(resourceName, partition),
+ resourceName, partition, clusterStatusMonitor, durationThreshold,
+ stateModelDef.getTopState());
+ } else if (lastTopStateInstance != null && !lastTopStateInstance
+ .equals(currentTopStateInstance)) {
+ // With no missing top state record, but top state instance changed,
+ // we observed an entire top state handoff process
+ reportSingleTopStateHandoff(cache, lastTopStateInstance, currentTopStateInstance,
+ resourceName, partition, clusterStatusMonitor, lastPipelineFinishTimestamp);
+ } else {
+ // else, there is not top state change, or top state first came up, do nothing
+ LogUtil.logDebug(LOG, _eventId, String.format(
+ "No top state hand off or first-seen top state for %s. CurNode: %s, LastNode: %s.",
+ partition.getPartitionName(), currentTopStateInstance, lastTopStateInstance));
+ }
+ }
+
+ /**
+ * This function calculates duration of a full top state handoff, observed in 1 pipeline run,
+ * i.e. current top state instance loaded from ZK is different than the one we cached during
+ * last pipeline run.
+ *
+ * @param cache ClusterDataCache
+ * @param lastTopStateInstance Name of last top state instance we cached
+ * @param curTopStateInstance Name of current top state instance we refreshed from ZK
+ * @param resourceName resource name
+ * @param partition partition object
+ * @param clusterStatusMonitor cluster state monitor object
+ * @param lastPipelineFinishTimestamp last pipeline run finish timestamp
+ */
+ private void reportSingleTopStateHandoff(ClusterDataCache cache, String lastTopStateInstance,
+ String curTopStateInstance, String resourceName, Partition partition,
+ ClusterStatusMonitor clusterStatusMonitor, long lastPipelineFinishTimestamp) {
+ if (curTopStateInstance.equals(lastTopStateInstance)) {
+ return;
+ }
+
+ // Current state output generation logic guarantees that current top state instance
+ // must be a live instance
+ String curTopStateSession = cache.getLiveInstances().get(curTopStateInstance).getSessionId();
+ long endTime =
+ cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+ .getEndTime(partition.getPartitionName());
+ long toTopStateuserLatency =
+ endTime - cache.getCurrentState(curTopStateInstance, curTopStateSession).get(resourceName)
+ .getStartTime(partition.getPartitionName());
+
+ long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+ long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+
+ // Make sure last top state instance has not bounced during cluster data cache refresh
+ if (cache.getLiveInstances().containsKey(lastTopStateInstance)) {
+ String lastTopStateSession =
+ cache.getLiveInstances().get(lastTopStateInstance).getSessionId();
+ // We need this null check as there are test cases creating incomplete current state
+ if (cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ != null) {
+ startTime =
+ cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ .getStartTime(partition.getPartitionName());
+ fromTopStateUserLatency =
+ cache.getCurrentState(lastTopStateInstance, lastTopStateSession).get(resourceName)
+ .getEndTime(partition.getPartitionName()) - startTime;
+ }
+ }
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+ // either cached last top state instance is no longer alive, or it bounced during cluster
+ // data cache refresh, we use last pipeline run end time for best guess. Though we can
+ // calculate this number in a more precise way by refreshing data from ZK, given the rarity
+ // of this corner case, it's not worthy.
+ startTime = lastPipelineFinishTimestamp;
+ fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+ }
+
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED || startTime > endTime) {
+ // Top state handoff finished before end of last pipeline run, and instance contains
+ // previous top state is no longer alive, so our best guess did not work, ignore the
+ // data point for now.
+ LogUtil.logWarn(LOG, _eventId, String
+ .format("Cannot confirm top state missing start time. %s:%s->%s. Likely it was very fast",
+ partition.getPartitionName(), lastTopStateInstance, curTopStateInstance));
+ return;
+ }
+
+ long duration = endTime - startTime;
+ long userLatency = fromTopStateUserLatency + toTopStateuserLatency;
+ // We always treat such top state handoff as graceful as if top state handoff is triggered
+ // by instance crash, we cannot observe the entire handoff process within 1 pipeline run
+ logMissingTopStateInfo(duration, userLatency, true, partition.getPartitionName());
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor
+ .updateMissingTopStateDurationStats(resourceName, duration, userLatency, true, true);
+ }
+ }
+
+ /**
+ * Check if the given partition of the given resource has a missing top state duration larger
+ * than the threshold, if so, report a top state transition failure
+ *
+ * @param cache cluster data cache
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param durationThreshold top state handoff duration threshold
+ * @param clusterStatusMonitor monitor object
+ */
+ private void reportTopStateHandoffFailIfNecessary(ClusterDataCache cache, String resourceName,
+ Partition partition, long durationThreshold, ClusterStatusMonitor clusterStatusMonitor) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+ String partitionName = partition.getPartitionName();
+ MissingTopStateRecord record = missingTopStateMap.get(resourceName).get(partitionName);
+ long startTime = record.getStartTimeStamp();
+ if (startTime > 0 && System.currentTimeMillis() - startTime > durationThreshold && !record
+ .isFailed()) {
+ record.setFailed();
+ missingTopStateMap.get(resourceName).put(partitionName, record);
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor.updateMissingTopStateDurationStats(resourceName, 0L, 0L, false, false);
+ }
+ }
+ }
+
+ /**
+ * When we find a top state missing of the given partition, we find out when it started to miss
+ * top state, then we record it in cache
+ *
+ * @param cache cluster data cache
+ * @param resourceName resource name
+ * @param partition partition of the given resource
+ * @param topState top state name
+ * @param currentStateOutput current state output
+ */
+ private void reportTopStateMissing(ClusterDataCache cache, String resourceName, Partition partition,
+ String topState, CurrentStateOutput currentStateOutput) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap = cache.getMissingTopStateMap();
+ Map<String, Map<String, String>> lastTopStateMap = cache.getLastTopStateLocationMap();
+ if (missingTopStateMap.containsKey(resourceName) && missingTopStateMap.get(resourceName)
+ .containsKey(partition.getPartitionName())) {
+ // a previous missing has been already recorded
+ return;
+ }
+
+ long startTime = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
+ long fromTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+ boolean isGraceful = true;
+
+ // 1. try to find the previous topstate missing event for the startTime.
+ String missingStateInstance = null;
+ if (lastTopStateMap.containsKey(resourceName)) {
+ missingStateInstance = lastTopStateMap.get(resourceName).get(partition.getPartitionName());
+ }
+
+ if (missingStateInstance != null) {
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ if (liveInstances.containsKey(missingStateInstance)) {
+ CurrentState currentState = cache.getCurrentState(missingStateInstance,
+ liveInstances.get(missingStateInstance).getSessionId()).get(resourceName);
+
+ if (currentState != null
+ && currentState.getPreviousState(partition.getPartitionName()) != null && currentState
+ .getPreviousState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+
+ // Update the latest start time only from top state to other state transition
+ // At beginning, the start time should -1 (not recorded). If something happen either
+ // instance not alive or the instance just started for that partition, Helix does not know
+ // the previous start time or end time. So we count from current.
+ //
+ // Previous state is top state does not mean that resource has only one top state
+ // (i.e. Online/Offline). So Helix has to find the latest start time as the staring point.
+ long fromTopStateStartTime = currentState.getStartTime(partition.getPartitionName());
+ if (fromTopStateStartTime > startTime) {
+ startTime = fromTopStateStartTime;
+ fromTopStateUserLatency =
+ currentState.getEndTime(partition.getPartitionName()) - startTime;
+ }
+ startTime = Math.max(startTime, currentState.getStartTime(partition.getPartitionName()));
+ } // Else no related state transition history found, use current time as the missing start time.
+ } else {
+ // If the previous topState holder is no longer alive, the offline time is used as start time.
+ // Also, if we observe a top state missing and the previous top state node is gone, the
+ // top state handoff is not graceful
+ isGraceful = false;
+ Map<String, Long> offlineMap = cache.getInstanceOfflineTimeMap();
+ if (offlineMap.containsKey(missingStateInstance)) {
+ startTime = Math.max(startTime, offlineMap.get(missingStateInstance));
+ }
+ }
+ }
+
+ // 2. if no previous top state records, it's either resource just created or there is a
+ // controller leadership change. Check any pending message that are created for top state
+ // transition. Assume this is graceful top state handoff as if the from top state instance
+ // crashed, we are not recording such message
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+ for (Message message : currentStateOutput.getPendingMessageMap(resourceName, partition)
+ .values()) {
+ // Only messages that match the current session ID will be recorded in the map.
+ // So no need to redundantly check here.
+ if (message.getToState().equals(topState)) {
+ startTime = Math.max(startTime, message.getCreateTimeStamp());
+ }
+ }
+ }
+
+ // 3. if no clue about previous top state or any related pending message, it could be
+ // a. resource just created
+ // b. controller leader switch (actual hand off could be either graceful or non graceful)
+ //
+ // Use the current system time as missing top state start time and assume always graceful
+ // TODO: revise this case and see if this case can be better addressed
+ if (startTime == TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED) {
+ LogUtil.logWarn(LOG, _eventId,
+ "Cannot confirm top state missing start time. Use the current system time as the start time.");
+ startTime = System.currentTimeMillis();
+ }
+
+ if (!missingTopStateMap.containsKey(resourceName)) {
+ missingTopStateMap.put(resourceName, new HashMap<String, MissingTopStateRecord>());
+ }
+
+ missingTopStateMap.get(resourceName).put(partition.getPartitionName(),
+ new MissingTopStateRecord(startTime, fromTopStateUserLatency, isGraceful));
+ }
+
+ /**
+ * When we see a top state come back, i.e. we observe a top state in this pipeline run,
+ * but have a top state missing record before, we need to remove the top state missing
+ * record and report top state handoff duration
+ *
+ * @param cache cluster data cache
+ * @param stateMap state map of the given partition of the given resource
+ * @param resourceName resource name
+ * @param partition partition of the resource
+ * @param clusterStatusMonitor monitor object
+ * @param threshold top state handoff threshold
+ * @param topState name of the top state
+ */
+ private void reportTopStateComesBack(ClusterDataCache cache, Map<String, String> stateMap, String resourceName,
+ Partition partition, ClusterStatusMonitor clusterStatusMonitor, long threshold,
+ String topState) {
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap =
+ cache.getMissingTopStateMap();
+ MissingTopStateRecord record =
+ missingTopStateMap.get(resourceName).get(partition.getPartitionName());
+ long handOffStartTime = record.getStartTimeStamp();
+ long fromTopStateUserLatency = record.getUserLatency();
+
+ // Find the earliest end time from the top states and the corresponding user latency
+ long handOffEndTime = Long.MAX_VALUE;
+ long toTopStateUserLatency = DEFAULT_HANDOFF_USER_LATENCY;
+ Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ for (String instanceName : stateMap.keySet()) {
+ CurrentState currentState =
+ cache.getCurrentState(instanceName, liveInstances.get(instanceName).getSessionId())
+ .get(resourceName);
+ if (currentState.getState(partition.getPartitionName()).equalsIgnoreCase(topState)) {
+ if (currentState.getEndTime(partition.getPartitionName()) <= handOffEndTime) {
+ handOffEndTime = currentState.getEndTime(partition.getPartitionName());
+ toTopStateUserLatency =
+ handOffEndTime - currentState.getStartTime(partition.getPartitionName());
+ }
+ }
+ }
+
+ if (handOffStartTime > 0 && handOffEndTime - handOffStartTime <= threshold) {
+ long duration = handOffEndTime - handOffStartTime;
+ long userLatency = fromTopStateUserLatency + toTopStateUserLatency;
+ // It is possible that during controller leader switch, we lost previous master information
+ // and use current time to approximate missing top state start time. If we see the actual
+ // user latency is larger than the duration we estimated, we use user latency to start with
+ duration = Math.max(duration, userLatency);
+ boolean isGraceful = record.isGracefulHandoff();
+ logMissingTopStateInfo(duration, userLatency, isGraceful, partition.getPartitionName());
+
+ if (clusterStatusMonitor != null) {
+ clusterStatusMonitor
+ .updateMissingTopStateDurationStats(resourceName, duration, userLatency, isGraceful,
+ true);
+ }
+ }
+ removeFromStatsMap(missingTopStateMap, resourceName, partition);
+ }
+
+ private void removeFromStatsMap(
+ Map<String, Map<String, MissingTopStateRecord>> missingTopStateMap, String resourceName,
+ Partition partition) {
+ if (missingTopStateMap.containsKey(resourceName)) {
+ missingTopStateMap.get(resourceName).remove(partition.getPartitionName());
+ }
+
+ if (missingTopStateMap.get(resourceName).size() == 0) {
+ missingTopStateMap.remove(resourceName);
+ }
+ }
+
+ private void logMissingTopStateInfo(long totalDuration, long userLatency, boolean isGraceful,
+ String partitionName) {
+ LogUtil.logInfo(LOG, _eventId, String
+ .format("Missing top state duration is %s/%s for partition %s. Graceful: %s", userLatency,
+ totalDuration, partitionName, isGraceful));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index e3655d8..f870ddc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -22,6 +22,21 @@ package org.apache.helix.monitoring.mbeans;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -38,22 +53,6 @@ import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private static final Logger LOG = LoggerFactory.getLogger(ClusterStatusMonitor.class);
@@ -446,11 +445,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
}
}
- public synchronized void updateMissingTopStateDurationStats(String resourceName, long duration, boolean succeeded) {
+ public synchronized void updateMissingTopStateDurationStats(String resourceName,
+ long totalDuration, long userLatency, boolean isGraceful, boolean succeeded) {
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
if (resourceMonitor != null) {
- resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, duration, succeeded);
+ resourceMonitor.updateStateHandoffStats(ResourceMonitor.MonitorState.TOP_STATE, totalDuration,
+ userLatency, isGraceful, succeeded);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index a103a0c..c3dd242 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -21,7 +21,15 @@ package org.apache.helix.monitoring.mbeans;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import javax.management.ObjectName;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -31,10 +39,6 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
-import javax.management.JMException;
-import javax.management.ObjectName;
-import java.util.*;
-
public class ResourceMonitor extends DynamicMBeanProvider {
// Gauges
@@ -60,6 +64,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
// Histograms
private HistogramDynamicMetric _partitionTopStateHandoffDurationGauge;
+ private HistogramDynamicMetric _partitionTopStateHandoffUserLatencyGauge;
+ private HistogramDynamicMetric _partitionTopStateNonGracefulHandoffDurationGauge;
private String _tag = ClusterStatusMonitor.DEFAULT_TAG;
private long _lastResetTime;
@@ -86,6 +92,8 @@ public class ResourceMonitor extends DynamicMBeanProvider {
attributeList.add(_failedTopStateHandoffCounter);
attributeList.add(_maxSinglePartitionTopStateHandoffDuration);
attributeList.add(_partitionTopStateHandoffDurationGauge);
+ attributeList.add(_partitionTopStateHandoffUserLatencyGauge);
+ attributeList.add(_partitionTopStateNonGracefulHandoffDurationGauge);
attributeList.add(_totalMessageReceived);
attributeList.add(_numPendingStateTransitions);
doRegister(attributeList, _initObjectName);
@@ -96,6 +104,7 @@ public class ResourceMonitor extends DynamicMBeanProvider {
TOP_STATE
}
+ @SuppressWarnings("unchecked")
public ResourceMonitor(String clusterName, String resourceName, ObjectName objectName) {
_clusterName = clusterName;
_resourceName = resourceName;
@@ -122,6 +131,14 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_partitionTopStateHandoffDurationGauge =
new HistogramDynamicMetric("PartitionTopStateHandoffDurationGauge", new Histogram(
new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
+ _partitionTopStateHandoffUserLatencyGauge =
+ new HistogramDynamicMetric("PartitionTopStateHandoffUserLatencyGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ _partitionTopStateNonGracefulHandoffDurationGauge =
+ new HistogramDynamicMetric("PartitionTopStateNonGracefulHandoffGauge", new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+
_totalMessageReceived = new SimpleDynamicMetric("TotalMessageReceived", 0L);
_maxSinglePartitionTopStateHandoffDuration =
new SimpleDynamicMetric("MaxSinglePartitionTopStateHandoffDurationGauge", 0L);
@@ -165,6 +182,18 @@ public class ResourceMonitor extends DynamicMBeanProvider {
return _maxSinglePartitionTopStateHandoffDuration.getValue();
}
+ public HistogramDynamicMetric getPartitionTopStateHandoffDurationGauge() {
+ return _partitionTopStateHandoffDurationGauge;
+ }
+
+ public HistogramDynamicMetric getPartitionTopStateNonGracefulHandoffDurationGauge() {
+ return _partitionTopStateNonGracefulHandoffDurationGauge;
+ }
+
+ public HistogramDynamicMetric getPartitionTopStateHandoffUserLatencyGauge() {
+ return _partitionTopStateHandoffUserLatencyGauge;
+ }
+
public long getFailedTopStateHandoffCounter() {
return _failedTopStateHandoffCounter.getValue();
}
@@ -310,25 +339,31 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_numPendingStateTransitions.updateValue((long) messageCount);
}
- public void updateStateHandoffStats(MonitorState monitorState, long duration, boolean succeeded) {
+ public void updateStateHandoffStats(MonitorState monitorState, long totalDuration,
+ long userLatency, boolean isGraceful, boolean succeeded) {
switch (monitorState) {
- case TOP_STATE:
- if (succeeded) {
- _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
- _successfulTopStateHandoffDurationCounter
- .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + duration);
- _partitionTopStateHandoffDurationGauge.updateValue(duration);
- if (duration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
- _maxSinglePartitionTopStateHandoffDuration.updateValue(duration);
- _lastResetTime = System.currentTimeMillis();
- }
+ case TOP_STATE:
+ if (succeeded) {
+ _successTopStateHandoffCounter.updateValue(_successTopStateHandoffCounter.getValue() + 1);
+ _successfulTopStateHandoffDurationCounter
+ .updateValue(_successfulTopStateHandoffDurationCounter.getValue() + totalDuration);
+ if (isGraceful) {
+ _partitionTopStateHandoffDurationGauge.updateValue(totalDuration);
+ _partitionTopStateHandoffUserLatencyGauge.updateValue(userLatency);
} else {
- _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+ _partitionTopStateNonGracefulHandoffDurationGauge.updateValue(totalDuration);
}
- break;
- default:
- _logger.warn(
- String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
+ if (totalDuration > _maxSinglePartitionTopStateHandoffDuration.getValue()) {
+ _maxSinglePartitionTopStateHandoffDuration.updateValue(totalDuration);
+ _lastResetTime = System.currentTimeMillis();
+ }
+ } else {
+ _failedTopStateHandoffCounter.updateValue(_failedTopStateHandoffCounter.getValue() + 1);
+ }
+ break;
+ default:
+ _logger.warn(
+ String.format("Wrong monitor state \"%s\" that not supported ", monitorState.name()));
}
}
@@ -379,4 +414,4 @@ public class ResourceMonitor extends DynamicMBeanProvider {
_lastResetTime = System.currentTimeMillis();
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0225f83..ea529e8 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -861,7 +861,7 @@ public class TaskDriver {
if (ctx == null || !allowedStates.contains(ctx.getWorkflowState())) {
throw new HelixException(String.format(
"Workflow \"%s\" context is empty or not in states: \"%s\", current state: \"%s\"",
- workflowName, targetStates.toString(),
+ workflowName, Arrays.asList(targetStates),
ctx == null ? "null" : ctx.getWorkflowState().toString()));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7e49f995/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 7495078..5509858 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -173,42 +173,56 @@ public class TestIndependentTaskRebalancer extends TaskTestBase {
@Test
public void testReassignment() throws Exception {
- final int NUM_INSTANCES = 5;
- String jobName = TestHelper.getTestMethodName();
- Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
+ String workflowName = TestHelper.getTestMethodName();
+ String jobNameSuffix = "job";
+ String jobName = String.format("%s_%s", workflowName, jobNameSuffix);
+ Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
+
+ // This is error prone as ThreadCountBasedTaskAssigner will always be re-assign
+ // task to same instance given we only have 1 task to assign and the order or
+ // iterating all nodes during assignment is always the same. Rarely some change
+ // will alter the order of iteration debug assignment so we need to change
+ // this instance name to keep on testing this functionality.
+ final String failInstance = "localhost_12919";
Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true,
- "failInstance", PARTICIPANT_PREFIX + '_' + (_startPort + 1)));
+ "failInstance", failInstance));
TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap);
taskConfigs.add(taskConfig1);
Map<String, String> jobCommandMap = Maps.newHashMap();
jobCommandMap.put("Timeout", "1000");
- JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand")
- .addTaskConfigs(taskConfigs).setJobCommandConfigMap(jobCommandMap);
- workflowBuilder.addJob(jobName, jobBuilder);
+ // Retry forever
+ JobConfig.Builder jobBuilder =
+ new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+ .setJobCommandConfigMap(jobCommandMap).setMaxAttemptsPerTask(Integer.MAX_VALUE);
+ workflowBuilder.addJob(jobNameSuffix, jobBuilder);
_driver.start(workflowBuilder.build());
- // Ensure the job completes
- _driver.pollForWorkflowState(jobName, TaskState.IN_PROGRESS);
- _driver.pollForWorkflowState(jobName, TaskState.COMPLETED);
+ // Poll to ensure that the gets re-attempted first
+ int trial = 0;
+ while (trial < 1000) { // 100 sec
+ JobContext jctx = _driver.getJobContext(jobName);
+ if (jctx != null && jctx.getPartitionNumAttempts(0) > 1) {
+ break;
+ }
+ Thread.sleep(100);
+ trial += 1;
+ }
+
+ if (trial == 1000) {
+ Assert.fail("Job " + jobName + " is not retried");
+ }
+
+ // disable failed instance
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, false);
+ _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
// Ensure that the class was invoked
Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName()));
-
- // Ensure that this was tried on two different instances, the first of which exhausted the
- // attempts number, and the other passes on the first try -> See below
-
- // TEST FIX: After quota-based scheduling support, we use a different assignment strategy (not
- // consistent hashing), which does not necessarily guarantee that failed tasks will be assigned
- // on a different instance. The parameters for this test are adjusted accordingly
- // Also, hard-coding the instance name (line 184) is not a reliable way of testing whether
- // re-assignment took place, so this test is no longer valid and will always pass
- Assert.assertEquals(_runCounts.size(), 1);
- // Assert.assertTrue(
- // _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / NUM_INSTANCES));
- Assert.assertTrue(_runCounts.values().contains(1));
+ Assert.assertNotSame(_driver.getJobContext(jobName).getAssignedParticipant(0), failInstance);
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, true);
}
@Test