You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/04/29 11:23:18 UTC

flink git commit: Revert "[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly"

Repository: flink
Updated Branches:
  refs/heads/release-1.0 f1d34b17b -> 0708dd08b


Revert "[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly"

This reverts commit 014a686ec5ea0e8809a5235fe988f828e5e70833.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0708dd08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0708dd08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0708dd08

Branch: refs/heads/release-1.0
Commit: 0708dd08b57d8cfca20c470ab4e909ea56cb9a38
Parents: f1d34b1
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Apr 29 11:22:33 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Apr 29 11:22:33 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  28 +--
 .../restart/FixedDelayRestartStrategy.java      |  30 +--
 .../restart/NoRestartStrategy.java              |  17 +-
 .../executiongraph/restart/RestartStrategy.java |   5 -
 .../restart/RestartStrategyFactory.java         |  28 +--
 .../flink/runtime/jobmanager/JobManager.scala   |  16 +-
 .../JobManagerLeaderElectionTest.java           |   2 +-
 .../LeaderChangeJobRecoveryTest.java            | 198 -------------------
 .../LeaderChangeStateCleanupTest.java           |   2 +-
 .../LeaderElectionRetrievalTestingCluster.java  |  23 +--
 .../runtime/testingUtils/TestingCluster.scala   |  10 +-
 .../testingUtils/TestingJobManager.scala        |   6 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   8 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   8 +-
 14 files changed, 49 insertions(+), 332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8cb1ded..7cb83cd 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -790,14 +790,6 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	public void fail(Throwable t) {
-		if (t instanceof UnrecoverableException) {
-			if (restartStrategy != null) {
-				// disable the restart strategy in case that we have seen a SuppressRestartsException
-				// it basically overrides the restart behaviour of a the root cause
-				restartStrategy.disable();
-			}
-		}
-
 		while (true) {
 			JobStatus current = state;
 			if (current == JobStatus.FAILING || current.isTerminalState()) {
@@ -1021,17 +1013,15 @@ public class ExecutionGraph implements Serializable {
 						}
 					}
 					else if (current == JobStatus.FAILING) {
-						if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
-							// double check in case that in the meantime a SuppressRestartsException was thrown
-							if (restartStrategy.canRestart()) {
-								restartStrategy.restart(this);
-								break;
-							} else {
-								fail(new Exception("ExecutionGraph went into RESTARTING state but " +
-									"then the restart strategy was disabled."));
-							}
-
-						} else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) {
+						boolean isRecoverable = !(failureCause instanceof UnrecoverableException);
+
+						if (isRecoverable && restartStrategy.canRestart() &&
+								transitionState(current, JobStatus.RESTARTING)) {
+							restartStrategy.restart(this);
+							break;
+
+						} else if ((!isRecoverable || !restartStrategy.canRestart()) &&
+							transitionState(current, JobStatus.FAILED, failureCause)) {
 							postRunCleanup();
 							break;
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 464b48e..d3c7eba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	private final int maxNumberRestartAttempts;
 	private final long delayBetweenRestartAttempts;
 	private int currentRestartAttempt;
-	private boolean disabled = false;
 
 	public FixedDelayRestartStrategy(
 		int maxNumberRestartAttempts,
@@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 
 	@Override
 	public boolean canRestart() {
-		return !disabled && currentRestartAttempt < maxNumberRestartAttempts;
+		return currentRestartAttempt < maxNumberRestartAttempts;
 	}
 
 	@Override
@@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 		}, executionGraph.getExecutionContext());
 	}
 
-	@Override
-	public void disable() {
-		disabled = true;
-	}
-
 	/**
 	 * Creates a FixedDelayRestartStrategy from the given Configuration.
 	 *
@@ -96,7 +90,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	 * @return Initialized instance of FixedDelayRestartStrategy
 	 * @throws Exception
 	 */
-	public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
+	public static FixedDelayRestartStrategy create(Configuration configuration) throws Exception {
 		int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
 
 		String timeoutString = configuration.getString(
@@ -124,7 +118,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 			}
 		}
 
-		return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
+		return new FixedDelayRestartStrategy(maxAttempts, delay);
 	}
 
 	@Override
@@ -134,22 +128,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 				", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +
 				')';
 	}
-
-	public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {
-
-		private static final long serialVersionUID = 6642934067762271950L;
-
-		private final int maxAttempts;
-		private final long delay;
-
-		public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {
-			this.maxAttempts = maxAttempts;
-			this.delay = delay;
-		}
-
-		@Override
-		public RestartStrategy createRestartStrategy() {
-			return new FixedDelayRestartStrategy(maxAttempts, delay);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 6cc5ee4..8911a98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -36,31 +36,18 @@ public class NoRestartStrategy implements RestartStrategy {
 		throw new RuntimeException("NoRestartStrategy does not support restart.");
 	}
 
-	@Override
-	public void disable() {}
-
 	/**
 	 * Creates a NoRestartStrategy instance.
 	 *
 	 * @param configuration Configuration object which is ignored
 	 * @return NoRestartStrategy instance
 	 */
-	public static NoRestartStrategyFactory createFactory(Configuration configuration) {
-		return new NoRestartStrategyFactory();
+	public static NoRestartStrategy create(Configuration configuration) {
+		return new NoRestartStrategy();
 	}
 
 	@Override
 	public String toString() {
 		return "NoRestartStrategy";
 	}
-
-	public static class NoRestartStrategyFactory extends RestartStrategyFactory {
-
-		private static final long serialVersionUID = -1809462525812787862L;
-
-		@Override
-		public RestartStrategy createRestartStrategy() {
-			return new NoRestartStrategy();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index c9e6277..2880c01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -38,9 +38,4 @@ public interface RestartStrategy {
 	 * @param executionGraph The ExecutionGraph to be restarted
 	 */
 	void restart(ExecutionGraph executionGraph);
-
-	/**
-	 * Disables the restart strategy.
-	 */
-	void disable();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index e58d775..68d114e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -25,21 +25,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-public abstract class RestartStrategyFactory implements Serializable {
-	private static final long serialVersionUID = 7320252552640522191L;
-
+public class RestartStrategyFactory {
 	private static final Logger LOG = LoggerFactory.getLogger(RestartStrategyFactory.class);
-	private static final String CREATE_METHOD = "createFactory";
-
-	/**
-	 * Factory method to create a restart strategy
-	 * @return The created restart strategy
-	 */
-	public abstract RestartStrategy createRestartStrategy();
+	private static final String CREATE_METHOD = "create";
 
 	/**
 	 * Creates a {@link RestartStrategy} instance from the given {@link org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}.
@@ -67,10 +58,11 @@ public abstract class RestartStrategyFactory implements Serializable {
 	/**
 	 * Creates a {@link RestartStrategy} instance from the given {@link Configuration}.
 	 *
+	 * @param configuration Configuration object containing the configuration values.
 	 * @return RestartStrategy instance
 	 * @throws Exception which indicates that the RestartStrategy could not be instantiated.
 	 */
-	public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception {
+	public static RestartStrategy createFromConfig(Configuration configuration) throws Exception {
 		String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase();
 
 		switch (restartStrategyName) {
@@ -100,16 +92,16 @@ public abstract class RestartStrategyFactory implements Serializable {
 				}
 
 				if (numberExecutionRetries > 0 && delay >= 0) {
-					return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay);
+					return new FixedDelayRestartStrategy(numberExecutionRetries, delay);
 				} else {
-					return NoRestartStrategy.createFactory(configuration);
+					return NoRestartStrategy.create(configuration);
 				}
 			case "off":
 			case "disable":
-				return NoRestartStrategy.createFactory(configuration);
+				return NoRestartStrategy.create(configuration);
 			case "fixeddelay":
 			case "fixed-delay":
-				return FixedDelayRestartStrategy.createFactory(configuration);
+				return FixedDelayRestartStrategy.create(configuration);
 			default:
 				try {
 					Class<?> clazz = Class.forName(restartStrategyName);
@@ -121,7 +113,7 @@ public abstract class RestartStrategyFactory implements Serializable {
 							Object result = method.invoke(null, configuration);
 
 							if (result != null) {
-								return (RestartStrategyFactory) result;
+								return (RestartStrategy) result;
 							}
 						}
 					}
@@ -136,7 +128,7 @@ public abstract class RestartStrategyFactory implements Serializable {
 				}
 
 				// fallback in case of an error
-				return NoRestartStrategy.createFactory(configuration);
+				return NoRestartStrategy.create(configuration);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 6391c82..cee5606 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -28,7 +28,6 @@ import akka.actor._
 import akka.pattern.ask
 
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
@@ -112,7 +111,7 @@ class JobManager(
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
-    protected val restartStrategyFactory: RestartStrategyFactory,
+    protected val defaultRestartStrategy: RestartStrategy,
     protected val timeout: FiniteDuration,
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
@@ -201,7 +200,7 @@ class JobManager(
     log.info(s"Stopping JobManager $getAddress.")
 
     val newFuturesToComplete = cancelAndClearEverything(
-      new UnrecoverableException(new Exception("The JobManager is shutting down.")),
+      new Exception("The JobManager is shutting down."),
       removeJobFromStateBackend = true)
 
     implicit val executionContext = context.dispatcher
@@ -298,7 +297,7 @@ class JobManager(
       log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
 
       val newFuturesToComplete = cancelAndClearEverything(
-        new UnrecoverableException(new Exception("JobManager is no longer the leader.")),
+        new Exception("JobManager is no longer the leader."),
         removeJobFromStateBackend = false)
 
       futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete)
@@ -951,7 +950,7 @@ class JobManager(
         val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration())
           .map(RestartStrategyFactory.createRestartStrategy(_)) match {
             case Some(strategy) => strategy
-            case None => restartStrategyFactory.createRestartStrategy()
+            case None => defaultRestartStrategy
           }
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
@@ -1495,7 +1494,7 @@ class JobManager(
     * @param cause Cause for the cancelling.
     */
   private def cancelAndClearEverything(
-      cause: UnrecoverableException,
+      cause: Throwable,
       removeJobFromStateBackend: Boolean)
     : Seq[Future[Unit]] = {
     val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
@@ -2097,7 +2096,7 @@ object JobManager {
     InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
-    RestartStrategyFactory,
+    RestartStrategy,
     FiniteDuration, // timeout
     Int, // number of archived jobs
     LeaderElectionService,
@@ -2113,7 +2112,8 @@ object JobManager {
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration)
+    val restartStrategy = RestartStrategyFactory
+      .createFromConfig(configuration)
 
     val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index afc46a7..fe35c0d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 				new Scheduler(TestingUtils.defaultExecutionContext()),
 				new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
 				ActorRef.noSender(),
-				new NoRestartStrategy.NoRestartStrategyFactory(),
+				new NoRestartStrategy(),
 				AkkaUtils.getDefaultTimeout(),
 				leaderElectionService,
 				submittedJobGraphStore,

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
deleted file mode 100644
index b13ae81..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.util.TestLogger;
-import org.junit.Before;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertTrue;
-
-public class LeaderChangeJobRecoveryTest extends TestLogger {
-
-	private static FiniteDuration timeout = FiniteDuration.apply(30, TimeUnit.SECONDS);
-
-	private int numTMs = 1;
-	private int numSlotsPerTM = 1;
-	private int parallelism = numTMs * numSlotsPerTM;
-
-	private Configuration configuration;
-	private LeaderElectionRetrievalTestingCluster cluster = null;
-	private JobGraph job = createBlockingJob(parallelism);
-
-	@Before
-	public void before() throws TimeoutException, InterruptedException {
-		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
-
-		configuration = new Configuration();
-
-		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
-		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
-
-		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999, 100));
-		cluster.start(false);
-
-		// wait for actors to be alive so that they have started their leader retrieval service
-		cluster.waitForActorsToBeAlive();
-	}
-
-	/**
-	 * Tests that the job is not restarted or at least terminates eventually in case that the
-	 * JobManager loses its leadership.
-	 *
-	 * @throws Exception
-	 */
-	@Test
-	public void testNotRestartedWhenLosingLeadership() throws Exception {
-		UUID leaderSessionID = UUID.randomUUID();
-
-		cluster.grantLeadership(0, leaderSessionID);
-		cluster.notifyRetrievalListeners(0, leaderSessionID);
-
-		cluster.waitForTaskManagersToBeRegistered(timeout);
-
-		cluster.submitJobDetached(job);
-
-		ActorGateway jm = cluster.getLeaderGateway(timeout);
-
-		Future<Object> wait = jm.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
-
-		Await.ready(wait, timeout);
-
-		Future<Object> futureExecutionGraph = jm.ask(new TestingJobManagerMessages.RequestExecutionGraph(job.getJobID()), timeout);
-
-		TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph =
-			(TestingJobManagerMessages.ResponseExecutionGraph) Await.result(futureExecutionGraph, timeout);
-
-		assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound);
-
-		ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph();
-
-		TestActorGateway testActorGateway = new TestActorGateway();
-
-		executionGraph.registerJobStatusListener(testActorGateway);
-
-		cluster.revokeLeadership();
-
-		Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState();
-
-		assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout));
-	}
-
-	public JobGraph createBlockingJob(int parallelism) {
-		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
-
-		JobVertex sender = new JobVertex("sender");
-		JobVertex receiver = new JobVertex("receiver");
-
-		sender.setInvokableClass(Tasks.Sender.class);
-		receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
-
-		sender.setParallelism(parallelism);
-		receiver.setParallelism(parallelism);
-
-		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
-
-		SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
-		sender.setSlotSharingGroup(slotSharingGroup);
-		receiver.setSlotSharingGroup(slotSharingGroup);
-
-		return new JobGraph("Blocking test job", sender, receiver);
-	}
-
-	public static class TestActorGateway implements ActorGateway {
-
-		private static final long serialVersionUID = -736146686160538227L;
-		private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-		public Future<Boolean> hasReachedTerminalState() {
-			return terminalState.future();
-		}
-
-		@Override
-		public Future<Object> ask(Object message, FiniteDuration timeout) {
-			return null;
-		}
-
-		@Override
-		public void tell(Object message) {
-			this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null));
-		}
-
-		@Override
-		public void tell(Object message, ActorGateway sender) {
-			if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
-				ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
-
-				if (jobStatusChanged.newJobStatus().isTerminalState()) {
-					terminalState.success(true);
-				}
-			}
-		}
-
-		@Override
-		public void forward(Object message, ActorGateway sender) {
-
-		}
-
-		@Override
-		public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
-			return null;
-		}
-
-		@Override
-		public String path() {
-			return null;
-		}
-
-		@Override
-		public ActorRef actor() {
-			return null;
-		}
-
-		@Override
-		public UUID leaderSessionID() {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 7876ff7..c490a64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -67,7 +67,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 
-		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null);
+		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false);
 		cluster.start(false); // TaskManagers don't have to register at the JobManager
 
 		cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index cd89fa6..c8cf868 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import scala.Option;
@@ -39,7 +38,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 
 	private final Configuration userConfiguration;
 	private final boolean useSingleActorSystem;
-	private final RestartStrategy restartStrategy;
 
 	public List<TestingLeaderElectionService> leaderElectionServices;
 	public List<TestingLeaderRetrievalService> leaderRetrievalServices;
@@ -49,8 +47,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 	public LeaderElectionRetrievalTestingCluster(
 			Configuration userConfiguration,
 			boolean singleActorSystem,
-			boolean synchronousDispatcher,
-			RestartStrategy restartStrategy) {
+			boolean synchronousDispatcher) {
 		super(userConfiguration, singleActorSystem, synchronousDispatcher);
 
 		this.userConfiguration = userConfiguration;
@@ -58,8 +55,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 
 		leaderElectionServices = new ArrayList<TestingLeaderElectionService>();
 		leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>();
-
-		this.restartStrategy = restartStrategy;
 	}
 
 	@Override
@@ -95,15 +90,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 				ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
 	}
 
-	@Override
-	public RestartStrategy getRestartStrategy(RestartStrategy other) {
-		if (this.restartStrategy != null) {
-			return this.restartStrategy;
-		} else {
-			return other;
-		}
-	}
-
 	public void grantLeadership(int index, UUID leaderSessionID) {
 		if(leaderIndex >= 0) {
 			// first revoke leadership
@@ -123,11 +109,4 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster {
 			service.notifyListener(address, leaderSessionID);
 		}
 	}
-
-	public void revokeLeadership() {
-		if (leaderIndex >= 0) {
-			leaderElectionServices.get(leaderIndex).notLeader();
-			leaderIndex = -1;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 8f321bb..c72eb50 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -24,10 +24,10 @@ import akka.pattern.ask
 import akka.actor.{ActorRef, Props, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
+import org.apache.flink.util.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 
@@ -96,7 +96,7 @@ class TestingCluster(
     instanceManager,
     scheduler,
     libraryCacheManager,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     archiveCount,
     leaderElectionService,
@@ -118,7 +118,7 @@ class TestingCluster(
         scheduler,
         libraryCacheManager,
         archive,
-        restartStrategyFactory,
+        restartStrategy,
         timeout,
         leaderElectionService,
         submittedJobsGraphs,
@@ -155,10 +155,6 @@ class TestingCluster(
     None
   }
 
-  def getRestartStrategy(restartStrategy: RestartStrategy) = {
-    restartStrategy
-  }
-
   @throws(classOf[TimeoutException])
   @throws(classOf[InterruptedException])
   def waitForTaskManagersToBeAlive(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e854b13..53867e0 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
@@ -44,7 +44,7 @@ class TestingJobManager(
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
+    restartStrategy: RestartStrategy,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -58,7 +58,7 @@ class TestingJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 717d631..4e6b745 100644
--- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -24,7 +24,7 @@ import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
@@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration
   * @param scheduler Scheduler to schedule Flink jobs
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Default restart strategy for job restarts
+  * @param restartStrategy Default restart strategy for job restarts
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the leader election
   */
@@ -57,7 +57,7 @@ class TestingYarnJobManager(
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
+    restartStrategy: RestartStrategy,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -71,7 +71,7 @@ class TestingYarnJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a6d587b..314c5bd 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
@@ -75,7 +75,7 @@ import scala.util.Try
   * @param scheduler Scheduler to schedule Flink jobs
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Restart strategy to be used in case of a job recovery
+  * @param restartStrategy Restart strategy to be used in case of a job recovery
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the leader election
   */
@@ -86,7 +86,7 @@ class YarnJobManager(
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
+    restartStrategy: RestartStrategy,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -100,7 +100,7 @@ class YarnJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     leaderElectionService,
     submittedJobGraphs,