You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/01/20 05:40:05 UTC
[09/18] flink git commit: [FLINK-5424] Improve Restart Strategy
Logging
[FLINK-5424] Improve Restart Strategy Logging
- Added toString for FailureRateRestartStrategy (important for
JobManager's "Using restart strategy $restartStrategy" log message)
- Added explanation in log when the restart strategy is responsible
for preventing job restart
This closes #3079.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c210ff37
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c210ff37
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c210ff37
Branch: refs/heads/master
Commit: c210ff37a4c55c835f76c031f2d9cf18165812aa
Parents: c93e04c
Author: Shannon Carey <re...@gmail.com>
Authored: Fri Jan 6 13:25:52 2017 -0600
Committer: zentol <ch...@apache.org>
Committed: Thu Jan 19 23:57:21 2017 +0100
----------------------------------------------------------------------
.../runtime/executiongraph/ExecutionGraph.java | 16 ++++++++++++++--
.../restart/FailureRateRestartStrategy.java | 10 ++++++++++
2 files changed, 24 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c210ff37/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 462fb20..9092bae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
@@ -1064,7 +1065,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
LOG.info("Try to restart or fail the job {} ({}) if no longer possible.", getJobName(), getJobID());
}
- boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();
+ final boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException);
+ final boolean isRestartStrategyAllowingRestart = restartStrategy.canRestart();
+ boolean isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart;
if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
@@ -1072,7 +1075,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return true;
} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
- LOG.info("Could not restart the job {} ({}).", getJobName(), getJobID(), failureCause);
+ final List<String> reasonsForNoRestart = new ArrayList<>(2);
+ if (!isFailureCauseAllowingRestart) {
+ reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
+ }
+ if (!isRestartStrategyAllowingRestart) {
+ reasonsForNoRestart.add("the restart strategy prevented it");
+ }
+
+ LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
+ StringUtils.join(reasonsForNoRestart, " and "), failureCause);
postRunCleanup();
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/c210ff37/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 10546a2..d95e1c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.util.Preconditions;
+
import scala.concurrent.duration.Duration;
import java.util.ArrayDeque;
@@ -77,6 +78,15 @@ public class FailureRateRestartStrategy implements RestartStrategy {
return restartTimestampsDeque.size() == maxFailuresPerInterval;
}
+ @Override
+ public String toString() {
+ return "FailureRateRestartStrategy(" +
+ "failuresInterval=" + failuresInterval +
+ "delayInterval=" + delayInterval +
+ "maxFailuresPerInterval=" + maxFailuresPerInterval +
+ ")";
+ }
+
public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
String failuresIntervalString = configuration.getString(