You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/04 13:57:41 UTC

flink git commit: [FLINK-4596] Fallback restart strategy config to let jobs choose restart configuration set at cluster level

Repository: flink
Updated Branches:
  refs/heads/master 852c5298e -> ac3997927


[FLINK-4596] Fallback restart strategy config to let jobs choose restart configuration set at cluster level

Added java doc for fallback restart strategy

This closes #2592.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac399792
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac399792
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac399792

Branch: refs/heads/master
Commit: ac3997927e38f9da45de0b001d308430c323c842
Parents: 852c529
Author: Nagarjun <ng...@netflix.com>
Authored: Wed Oct 5 00:26:05 2016 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 4 14:56:56 2016 +0100

----------------------------------------------------------------------
 docs/setup/fault_tolerance.md                     |  6 ++++++
 .../common/restartstrategy/RestartStrategies.java | 18 ++++++++++++++++++
 .../restart/RestartStrategyFactory.java           |  2 ++
 .../flink/runtime/jobmanager/JobManager.scala     |  7 ++++---
 4 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/docs/setup/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/setup/fault_tolerance.md b/docs/setup/fault_tolerance.md
index 20f68ee..fa1c821 100644
--- a/docs/setup/fault_tolerance.md
+++ b/docs/setup/fault_tolerance.md
@@ -457,4 +457,10 @@ env.setRestartStrategy(RestartStrategies.noRestart())
 </div>
 </div>
 
+### Fallback Restart Strategy
+
+The cluster defined restart strategy is used. 
+This helpful for streaming programs which enable checkpointing.
+Per default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.
+
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index d5db466..7073c2c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -42,6 +42,10 @@ public class RestartStrategies {
 		return new NoRestartStrategyConfiguration();
 	}
 
+	public static RestartStrategyConfiguration fallBackRestart() {
+		return new FallbackRestartStrategyConfiguration();
+	}
+
 	/**
 	 * Generates a FixedDelayRestartStrategyConfiguration.
 	 *
@@ -173,4 +177,18 @@ public class RestartStrategies {
 					+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
 		}
 	}
+
+	/**
+	 * Restart strategy configuration that could be used by jobs to use cluster level restart
+	 * strategy. Useful especially when one has a custom implementation of restart strategy set via
+	 * flink-conf.yaml.
+	 */
+	final public static class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{
+		private static final long serialVersionUID = -4441787204284085544L;
+
+		@Override
+		public String getDescription() {
+			return "Cluster level default restart strategy";
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 870bf63..27ee9b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -66,6 +66,8 @@ public abstract class RestartStrategyFactory implements Serializable {
 					config.getFailureInterval(),
 					config.getDelayBetweenAttemptsInterval()
 			);
+		} else if (restartStrategyConfiguration instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
+			return null;
 		} else {
 			throw new IllegalArgumentException("Unknown restart strategy configuration " +
 				restartStrategyConfiguration + ".");

http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9cc8be6..3f0689f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1221,7 +1221,8 @@ class JobManager(
           Option(jobGraph.getSerializedExecutionConfig()
             .deserializeValue(userCodeLoader)
             .getRestartStrategy())
-            .map(RestartStrategyFactory.createRestartStrategy) match {
+            .map(RestartStrategyFactory.createRestartStrategy)
+            .filter(p => p != null) match {
             case Some(strategy) => strategy
             case None => restartStrategyFactory.createRestartStrategy()
           }
@@ -1534,7 +1535,7 @@ class JobManager(
       case _ => unhandled(actorMsg)
     }
   }
-  
+
   /**
    * Handle unmatched messages with an exception.
    */
@@ -2607,7 +2608,7 @@ object JobManager {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
       case None => actorSystem.actorOf(jobManagerProps)
     }
-    
+
     metricsRegistry match {
       case Some(registry) =>
         registry.startQueryService(actorSystem, null)