You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/14 05:52:31 UTC

[11/12] flink git commit: [FLINK-9143] Use cluster strategy if none was set on client side

[FLINK-9143] Use cluster strategy if none was set on client side

Added NoOrFixedIfCheckpointingEnabledRestartStrategy

This closes #6283.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57872d53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57872d53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57872d53

Branch: refs/heads/master
Commit: 57872d53c4584faace6dc8e4038ad1f2d068a453
Parents: c9ad0a0
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Thu Jul 5 13:48:23 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 18:32:57 2018 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 10 +--
 .../restartstrategy/RestartStrategies.java      | 48 ++++++++++++-
 .../runtime/webmonitor/WebFrontendITCase.java   |  2 +-
 ...ckpointingEnabledRestartStrategyFactory.java | 42 ++++++++++++
 .../restart/NoRestartStrategy.java              |  4 +-
 .../restart/RestartStrategyFactory.java         | 63 ++++++++---------
 .../restart/RestartStrategyResolving.java       | 66 ++++++++++++++++++
 .../apache/flink/runtime/jobgraph/JobGraph.java | 20 +++++-
 .../flink/runtime/jobmaster/JobMaster.java      | 16 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 20 +++---
 .../checkpoint/CoordinatorShutdownTest.java     | 19 ++++--
 .../restart/RestartStrategyResolvingTest.java   | 71 ++++++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 41 +++++++++++
 .../TestingJobManagerSharedServicesBuilder.java |  4 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 15 -----
 .../streaming/api/RestartStrategyTest.java      | 10 ++-
 16 files changed, 367 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 7a0a574..59fa803 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.Preconditions;
 
 import com.esotericsoftware.kryo.Serializer;
 
@@ -138,7 +139,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	@Deprecated
 	private long executionRetryDelay = DEFAULT_RESTART_DELAY;
 
-	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
+	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+		new RestartStrategies.FallbackRestartStrategyConfiguration();
 	
 	private long taskCancellationIntervalMillis = -1;
 
@@ -390,7 +392,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 */
 	@PublicEvolving
 	public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
-		this.restartStrategyConfiguration = restartStrategyConfiguration;
+		this.restartStrategyConfiguration = Preconditions.checkNotNull(restartStrategyConfiguration);
 	}
 
 	/**
@@ -401,14 +403,14 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	@PublicEvolving
 	@SuppressWarnings("deprecation")
 	public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
-		if (restartStrategyConfiguration == null) {
+		if (restartStrategyConfiguration instanceof RestartStrategies.FallbackRestartStrategyConfiguration) {
 			// support the old API calls by creating a restart strategy from them
 			if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0) {
 				return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay());
 			} else if (getNumberOfExecutionRetries() == 0) {
 				return RestartStrategies.noRestart();
 			} else {
-				return null;
+				return restartStrategyConfiguration;
 			}
 		} else {
 			return restartStrategyConfiguration;

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index f3eb3a5..4f67290 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
 
 import java.io.Serializable;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -106,6 +107,19 @@ public class RestartStrategies {
 		public String getDescription() {
 			return "Restart deactivated.";
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			return o instanceof NoRestartStrategyConfiguration;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash();
+		}
 	}
 
 	/**
@@ -188,6 +202,25 @@ public class RestartStrategies {
 			return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
 					+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;
+			return maxFailureRate == that.maxFailureRate &&
+				Objects.equals(failureInterval, that.failureInterval) &&
+				Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);
+		}
 	}
 
 	/**
@@ -195,12 +228,25 @@ public class RestartStrategies {
 	 * strategy. Useful especially when one has a custom implementation of restart strategy set via
 	 * flink-conf.yaml.
 	 */
-	public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{
+	public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {
 		private static final long serialVersionUID = -4441787204284085544L;
 
 		@Override
 		public String getDescription() {
 			return "Cluster level default restart strategy";
 		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			return o instanceof FallbackRestartStrategyConfiguration;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index fb8258a..b90277f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -313,7 +313,7 @@ public class WebFrontendITCase extends TestLogger {
 			assertEquals(HttpResponseStatus.OK, response.getStatus());
 			assertEquals("application/json; charset=UTF-8", response.getType());
 			assertEquals("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\"," +
-				"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
+				"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\"," +
 				"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
new file mode 100644
index 0000000..7b5c1a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * Default restart strategy that resolves either to {@link NoRestartStrategy} or {@link FixedDelayRestartStrategy}
+ * depending if checkpointing was enabled.
+ */
+public class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends RestartStrategyFactory {
+	private static final long DEFAULT_RESTART_DELAY = 0;
+
+	private static final long serialVersionUID = -1809462525812787862L;
+
+	@Override
+	public RestartStrategy createRestartStrategy() {
+		return createRestartStrategy(false);
+	}
+
+	RestartStrategy createRestartStrategy(boolean isCheckpointingEnabled) {
+		if (isCheckpointingEnabled) {
+			return new FixedDelayRestartStrategy(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY);
+		} else {
+			return new NoRestartStrategy();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 5502d2d..b639614 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -38,10 +38,10 @@ public class NoRestartStrategy implements RestartStrategy {
 	}
 
 	/**
-	 * Creates a NoRestartStrategy instance.
+	 * Creates a NoRestartStrategyFactory instance.
 	 *
 	 * @param configuration Configuration object which is ignored
-	 * @return NoRestartStrategy instance
+	 * @return NoRestartStrategyFactory instance
 	 */
 	public static NoRestartStrategyFactory createFactory(Configuration configuration) {
 		return new NoRestartStrategyFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 717e1d2..f15ee0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -82,38 +82,41 @@ public abstract class RestartStrategyFactory implements Serializable {
 	 * @throws Exception which indicates that the RestartStrategy could not be instantiated.
 	 */
 	public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception {
-		String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none");
+		String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, null);
+
+		if (restartStrategyName == null) {
+			// support deprecated ConfigConstants values
+			final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+				ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+			String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
+			String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+				pauseString);
+
+			long delay;
+
+			try {
+				delay = Duration.apply(delayString).toMillis();
+			} catch (NumberFormatException nfe) {
+				if (delayString.equals(pauseString)) {
+					throw new Exception("Invalid config value for " +
+						AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
+						". Value must be a valid duration (such as '10 s' or '1 min')");
+				} else {
+					throw new Exception("Invalid config value for " +
+						ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
+						". Value must be a valid duration (such as '100 milli' or '10 s')");
+				}
+			}
+
+			if (numberExecutionRetries > 0 && delay >= 0) {
+				return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
+			} else {
+				return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
+			}
+		}
 
 		switch (restartStrategyName.toLowerCase()) {
 			case "none":
-				// support deprecated ConfigConstants values
-				final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
-					ConfigConstants.DEFAULT_EXECUTION_RETRIES);
-				String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
-				String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
-					pauseString);
-
-				long delay;
-
-				try {
-					delay = Duration.apply(delayString).toMillis();
-				} catch (NumberFormatException nfe) {
-					if (delayString.equals(pauseString)) {
-						throw new Exception("Invalid config value for " +
-							AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
-							". Value must be a valid duration (such as '10 s' or '1 min')");
-					} else {
-						throw new Exception("Invalid config value for " +
-							ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
-							". Value must be a valid duration (such as '100 milli' or '10 s')");
-					}
-				}
-
-				if (numberExecutionRetries > 0 && delay >= 0) {
-					return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
-				} else {
-					return NoRestartStrategy.createFactory(configuration);
-				}
 			case "off":
 			case "disable":
 				return NoRestartStrategy.createFactory(configuration);
@@ -149,7 +152,7 @@ public abstract class RestartStrategyFactory implements Serializable {
 				}
 
 				// fallback in case of an error
-				return NoRestartStrategy.createFactory(configuration);
+				return new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
new file mode 100644
index 0000000..ad7aa93
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+	/**
+	 * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
+	 * The resolving strategy is as follows:
+	 * <ol>
+	 * <li>Strategy set within job graph.</li>
+	 * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing
+	 * is enabled.</li>
+	 * <li>If no strategy was set on client and server side and checkpointing was enabled then
+	 * {@link FixedDelayRestartStrategy} is used</li>
+	 * </ol>
+	 *
+	 * @param clientConfiguration restart configuration given within the job graph
+	 * @param serverStrategyFactory default server side strategy factory
+	 * @param isCheckpointingEnabled if checkpointing was enabled for the job
+	 * @return resolved strategy
+	 */
+	public static RestartStrategy resolve(
+			RestartStrategies.RestartStrategyConfiguration clientConfiguration,
+			RestartStrategyFactory serverStrategyFactory,
+			boolean isCheckpointingEnabled) {
+
+		final RestartStrategy clientSideRestartStrategy =
+			RestartStrategyFactory.createRestartStrategy(clientConfiguration);
+
+		if (clientSideRestartStrategy != null) {
+			return clientSideRestartStrategy;
+		} else {
+			if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
+				return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
+					.createRestartStrategy(isCheckpointingEnabled);
+			} else {
+				return serverStrategyFactory.createRestartStrategy();
+			}
+		}
+	}
+
+	private RestartStrategyResolving() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index b3e03de..377f870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -329,7 +329,7 @@ public class JobGraph implements Serializable {
 	 * Sets the settings for asynchronous snapshots. A value of {@code null} means that
 	 * snapshotting is not enabled.
 	 *
-	 * @param settings The snapshot settings, or null, to disable snapshotting.
+	 * @param settings The snapshot settings
 	 */
 	public void setSnapshotSettings(JobCheckpointingSettings settings) {
 		this.snapshotSettings = settings;
@@ -339,13 +339,29 @@ public class JobGraph implements Serializable {
 	 * Gets the settings for asynchronous snapshots. This method returns null, when
 	 * checkpointing is not enabled.
 	 *
-	 * @return The snapshot settings, or null, if checkpointing is not enabled.
+	 * @return The snapshot settings
 	 */
 	public JobCheckpointingSettings getCheckpointingSettings() {
 		return snapshotSettings;
 	}
 
 	/**
+	 * Checks if the checkpointing was enabled for this job graph
+	 *
+	 * @return true if checkpointing enabled
+	 */
+	public boolean isCheckpointingEnabled() {
+
+		if (snapshotSettings == null) {
+			return false;
+		}
+
+		long checkpointInterval = snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
+		return checkpointInterval > 0 &&
+			checkpointInterval < Long.MAX_VALUE;
+	}
+
+	/**
 	 * Searches for a vertex with a matching ID and returns it.
 	 *
 	 * @param id

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7557bc3..1660f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
@@ -51,7 +52,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -273,11 +274,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 						.deserializeValue(userCodeLoader)
 						.getRestartStrategy();
 
-		this.restartStrategy = (restartStrategyConfiguration != null) ?
-				RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
-				jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();
+		this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
+			jobManagerSharedServices.getRestartStrategyFactory(),
+			jobGraph.isCheckpointingEnabled());
 
-		log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
+		log.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobName, jid);
 
 		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
@@ -1649,4 +1650,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			return CompletableFuture.completedFuture(null);
 		}
 	}
+
+	@VisibleForTesting
+	RestartStrategy getRestartStrategy() {
+		return restartStrategy;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cebff58..1c8174f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph._
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.{RestartStrategyFactory, RestartStrategyResolving}
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, InstanceManager}
@@ -1250,15 +1250,15 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val restartStrategy =
-          Option(jobGraph.getSerializedExecutionConfig()
-            .deserializeValue(userCodeLoader)
-            .getRestartStrategy())
-            .map(RestartStrategyFactory.createRestartStrategy)
-            .filter(p => p != null) match {
-            case Some(strategy) => strategy
-            case None => restartStrategyFactory.createRestartStrategy()
-          }
+        val restartStrategyConfiguration = jobGraph
+          .getSerializedExecutionConfig
+          .deserializeValue(userCodeLoader)
+          .getRestartStrategy
+
+        val restartStrategy = RestartStrategyResolving
+          .resolve(restartStrategyConfiguration,
+            restartStrategyFactory,
+            jobGraph.isCheckpointingEnabled)
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 8a6a9d8..f6b7730 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -40,14 +42,14 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -68,7 +70,10 @@ public class CoordinatorShutdownTest extends TestLogger {
 			JobVertex vertex = new JobVertex("Test Vertex");
 			vertex.setInvokableClass(FailingBlockingInvokable.class);
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
-			
+
+			final ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.setRestartStrategy(RestartStrategies.noRestart());
+
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(
 				new JobCheckpointingSettings(
@@ -83,7 +88,9 @@ public class CoordinatorShutdownTest extends TestLogger {
 						CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
 						true),
 					null));
-			
+			testGraph.setExecutionConfig(executionConfig);
+
+
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
 			FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
new file mode 100644
index 0000000..4194e97
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fallBackRestart;
+import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+/**
+ * Tests for {@link RestartStrategyResolving}.
+ */
+public class RestartStrategyResolvingTest extends TestLogger {
+
+	@Test
+	public void testClientSideHighestPriority() {
+		RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(noRestart(),
+			new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L),
+			true);
+
+		assertThat(resolvedStrategy, instanceOf(NoRestartStrategy.class));
+	}
+
+	@Test
+	public void testFixedStrategySetWhenCheckpointingEnabled() {
+		RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
+			new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(),
+			true);
+
+		assertThat(resolvedStrategy, instanceOf(FixedDelayRestartStrategy.class));
+	}
+
+	@Test
+	public void testServerStrategyIsUsedSetWhenCheckpointingEnabled() {
+		RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
+			new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)),
+			true);
+
+		assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class));
+	}
+
+	@Test
+	public void testServerStrategyIsUsedSetWhenCheckpointingDisabled() {
+		RestartStrategy resolvedStrategy = RestartStrategyResolving.resolve(fallBackRestart(),
+			new FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, Time.seconds(5), Time.seconds(2)),
+			false);
+
+		assertThat(resolvedStrategy, instanceOf(FailureRateRestartStrategy.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d7dc017..82fdc94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -46,6 +46,9 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -110,6 +113,8 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link JobMaster}.
@@ -358,6 +363,42 @@ public class JobMasterTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that in a streaming use case where checkpointing is enabled, a
+	 * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
+	 * strategy has been specified.
+	 */
+	@Test
+	public void testAutomaticRestartingWhenCheckpointing() throws Exception {
+		// create savepoint data
+		final long savepointId = 42L;
+		final File savepointFile = createSavepoint(savepointId);
+
+		// set savepoint settings
+		final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(
+			savepointFile.getAbsolutePath(),
+			true);
+		final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+		final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+		final TestingCheckpointRecoveryFactory testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
+			completedCheckpointStore,
+			new StandaloneCheckpointIDCounter());
+		haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+		final JobMaster jobMaster = createJobMaster(
+			new Configuration(),
+			jobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder()
+				.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+				.build());
+
+		RestartStrategy restartStrategy = jobMaster.getRestartStrategy();
+
+		assertNotNull(restartStrategy);
+		assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
+	}
+
+	/**
 	 * Tests that an existing checkpoint will have precedence over an savepoint
 	 */
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index f0b232a..030e4e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
@@ -48,7 +48,7 @@ public class TestingJobManagerSharedServicesBuilder {
 	public TestingJobManagerSharedServicesBuilder() {
 		scheduledExecutorService = TestingUtils.defaultExecutor();
 		libraryCacheManager = mock(LibraryCacheManager.class);
-		restartStrategyFactory = new NoRestartStrategy.NoRestartStrategyFactory();
+		restartStrategyFactory = new NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
 		stackTraceSampleCoordinator = mock(StackTraceSampleCoordinator.class);
 		backPressureStatsTracker = VoidBackPressureStatsTracker.INSTANCE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 603b9e4..e905eac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -81,12 +80,6 @@ public class StreamingJobGraphGenerator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
 
-	/**
-	 * Restart delay used for the FixedDelayRestartStrategy in case checkpointing was enabled but
-	 * no restart strategy has been specified.
-	 */
-	private static final long DEFAULT_RESTART_DELAY = 0L;
-
 	// ------------------------------------------------------------------------
 
 	public static JobGraph createJobGraph(StreamGraph streamGraph) {
@@ -590,17 +583,9 @@ public class StreamingJobGraphGenerator {
 
 		long interval = cfg.getCheckpointInterval();
 		if (interval > 0) {
-
 			ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
 			// propagate the expected behaviour for checkpoint errors to task.
 			executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-
-			// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy
-			if (executionConfig.getRestartStrategy() == null) {
-				// if the user enabled checkpointing, the default number of exec retries is infinite.
-				executionConfig.setRestartStrategy(
-					RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
-			}
 		} else {
 			// interval of max value means disable periodic checkpoint
 			interval = Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index b231bea..03b5a53 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -33,12 +33,11 @@ import org.junit.Test;
 public class RestartStrategyTest extends TestLogger {
 
 	/**
-	 * Tests that in a streaming use case where checkpointing is enabled, a
-	 * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
-	 * strategy has been specified.
+	 * Tests that in a streaming use case where checkpointing is enabled, there is no default strategy set on the
+	 * client side.
 	 */
 	@Test
-	public void testAutomaticRestartingWhenCheckpointing() throws Exception {
+	public void testFallbackStrategyOnClientSideWhenCheckpointingEnabled() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.enableCheckpointing(500);
 
@@ -51,8 +50,7 @@ public class RestartStrategyTest extends TestLogger {
 			jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
 
 		Assert.assertNotNull(restartStrategy);
-		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
-		Assert.assertEquals(Integer.MAX_VALUE, ((RestartStrategies.FixedDelayRestartStrategyConfiguration) restartStrategy).getRestartAttempts());
+		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FallbackRestartStrategyConfiguration);
 	}
 
 	/**