You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/04 13:57:41 UTC
flink git commit: [FLINK-4596] Fallback restart strategy config to
let jobs choose restart configuration set at cluster level
Repository: flink
Updated Branches:
refs/heads/master 852c5298e -> ac3997927
[FLINK-4596] Fallback restart strategy config to let jobs choose restart configuration set at cluster level
Added java doc for fallback restart strategy
This closes #2592.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac399792
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac399792
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac399792
Branch: refs/heads/master
Commit: ac3997927e38f9da45de0b001d308430c323c842
Parents: 852c529
Author: Nagarjun <ng...@netflix.com>
Authored: Wed Oct 5 00:26:05 2016 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 4 14:56:56 2016 +0100
----------------------------------------------------------------------
docs/setup/fault_tolerance.md | 6 ++++++
.../common/restartstrategy/RestartStrategies.java | 18 ++++++++++++++++++
.../restart/RestartStrategyFactory.java | 2 ++
.../flink/runtime/jobmanager/JobManager.scala | 7 ++++---
4 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/docs/setup/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/setup/fault_tolerance.md b/docs/setup/fault_tolerance.md
index 20f68ee..fa1c821 100644
--- a/docs/setup/fault_tolerance.md
+++ b/docs/setup/fault_tolerance.md
@@ -457,4 +457,10 @@ env.setRestartStrategy(RestartStrategies.noRestart())
</div>
</div>
+### Fallback Restart Strategy
+
+The cluster defined restart strategy is used.
+This helpful for streaming programs which enable checkpointing.
+Per default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.
+
{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index d5db466..7073c2c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -42,6 +42,10 @@ public class RestartStrategies {
return new NoRestartStrategyConfiguration();
}
+ public static RestartStrategyConfiguration fallBackRestart() {
+ return new FallbackRestartStrategyConfiguration();
+ }
+
/**
* Generates a FixedDelayRestartStrategyConfiguration.
*
@@ -173,4 +177,18 @@ public class RestartStrategies {
+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
}
}
+
+ /**
+ * Restart strategy configuration that could be used by jobs to use cluster level restart
+ * strategy. Useful especially when one has a custom implementation of restart strategy set via
+ * flink-conf.yaml.
+ */
+ final public static class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{
+ private static final long serialVersionUID = -4441787204284085544L;
+
+ @Override
+ public String getDescription() {
+ return "Cluster level default restart strategy";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 870bf63..27ee9b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -66,6 +66,8 @@ public abstract class RestartStrategyFactory implements Serializable {
config.getFailureInterval(),
config.getDelayBetweenAttemptsInterval()
);
+ } else if (restartStrategyConfiguration instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
+ return null;
} else {
throw new IllegalArgumentException("Unknown restart strategy configuration " +
restartStrategyConfiguration + ".");
http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9cc8be6..3f0689f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1221,7 +1221,8 @@ class JobManager(
Option(jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy())
- .map(RestartStrategyFactory.createRestartStrategy) match {
+ .map(RestartStrategyFactory.createRestartStrategy)
+ .filter(p => p != null) match {
case Some(strategy) => strategy
case None => restartStrategyFactory.createRestartStrategy()
}
@@ -1534,7 +1535,7 @@ class JobManager(
case _ => unhandled(actorMsg)
}
}
-
+
/**
* Handle unmatched messages with an exception.
*/
@@ -2607,7 +2608,7 @@ object JobManager {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
case None => actorSystem.actorOf(jobManagerProps)
}
-
+
metricsRegistry match {
case Some(registry) =>
registry.startQueryService(actorSystem, null)