You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:40:05 UTC

[09/18] flink git commit: [FLINK-5424] Improve Restart Strategy Logging

[FLINK-5424] Improve Restart Strategy Logging

- Added toString for FailureRateRestartStrategy (important for
JobManager's "Using restart strategy $restartStrategy" log message)
- Added explanation in log when the restart strategy is responsible
for preventing job restart

This closes #3079.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c210ff37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c210ff37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c210ff37

Branch: refs/heads/master
Commit: c210ff37a4c55c835f76c031f2d9cf18165812aa
Parents: c93e04c
Author: Shannon Carey <re...@gmail.com>
Authored: Fri Jan 6 13:25:52 2017 -0600
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 23:57:21 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java      | 16 ++++++++++++++--
 .../restart/FailureRateRestartStrategy.java         | 10 ++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c210ff37/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 462fb20..9092bae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -1064,7 +1065,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID());
 				}
 
-				boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();
+				final boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException);
+				final boolean isRestartStrategyAllowingRestart = restartStrategy.canRestart();
+				boolean isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart;
 
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
 					LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
@@ -1072,7 +1075,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 					return true;
 				} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
-					LOG.info("Could not restart the job {} ({}).", getJobName(), getJobID(), failureCause);
+					final List<String> reasonsForNoRestart = new ArrayList<>(2);
+					if (!isFailureCauseAllowingRestart) {
+						reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
+					}
+					if (!isRestartStrategyAllowingRestart) {
+						reasonsForNoRestart.add("the restart strategy prevented it");
+					}
+
+					LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
+						StringUtils.join(reasonsForNoRestart, " and "), failureCause);
 					postRunCleanup();
 
 					return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/c210ff37/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 10546a2..d95e1c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
+
 import scala.concurrent.duration.Duration;
 
 import java.util.ArrayDeque;
@@ -77,6 +78,15 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 		return restartTimestampsDeque.size() == maxFailuresPerInterval;
 	}
 
+	@Override
+	public String toString() {
+		return "FailureRateRestartStrategy(" +
+			"failuresInterval=" + failuresInterval +
+			"delayInterval=" + delayInterval +
+			"maxFailuresPerInterval=" + maxFailuresPerInterval +
+			")";
+	}
+
 	public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
 		int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
 		String failuresIntervalString = configuration.getString(