You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/23 09:36:05 UTC

[flink] branch master updated: [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 21b92ac  [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase
21b92ac is described below

commit 21b92ac95f1824e9b1ca483fa3ffaaf77ef14d4a
Author: azagrebin <az...@users.noreply.github.com>
AuthorDate: Thu May 23 11:35:48 2019 +0200

    [FLINK-12181][runtime] Port ExecutionGraphRestartTest to new codebase
---
 .../executiongraph/ExecutionGraphRestartTest.java  | 709 ++++++++++-----------
 1 file changed, 331 insertions(+), 378 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 1c5b650..1e6be72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
@@ -35,9 +35,6 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.NotCancelAckingTaskGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -45,10 +42,19 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
+import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -58,23 +64,18 @@ import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nonnull;
-
 import java.io.IOException;
-import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
@@ -97,7 +98,10 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
 
-	private TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor = TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+	private static final TestingComponentMainThreadExecutorServiceAdapter mainThreadExecutor =
+		TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
+
+	private static final JobID TEST_JOB_ID = new JobID();
 
 	@After
 	public void shutdown() {
@@ -109,8 +113,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	@Test
 	public void testNoManualRestart() throws Exception {
 		NoRestartStrategy restartStrategy = new NoRestartStrategy();
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
-		ExecutionGraph eg = executionGraphInstanceTuple.f0;
+		ExecutionGraph eg = createSimpleExecutionGraph(
+			restartStrategy, new SimpleSlotProvider(TEST_JOB_ID, NUM_TASKS), createJobGraph());
 
 		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
 
@@ -136,149 +140,128 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testRestartAutomatically() throws Exception {
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple =
-			createExecutionGraph(TestRestartStrategy.directExecuting());
-
-		ExecutionGraph eg = executionGraphInstanceTuple.f0;
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			restartAfterFailure(TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(TestRestartStrategy.directExecuting())
+				.buildAndScheduleForExecution(slotPool));
+		}
 
-		restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true);
 	}
 
 	@Test
 	public void testCancelWhileRestarting() throws Exception {
 		// We want to manually control the restart and delay
-		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
-		ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
-		Instance instance = executionGraphInstanceTuple.f1;
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new InfiniteDelayRestartStrategy())
+				.setTaskManagerLocation(taskManagerLocation)
+				.buildAndScheduleForExecution(slotPool);
 
-		// Kill the instance and wait for the job to restart
-		instance.markDead();
-		Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+			// Release the TaskManager and wait for the job to restart
+			slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception"));
+			assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
-		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+			// Canceling needs to abort the restart
+			executionGraph.cancel();
 
-		// Canceling needs to abort the restart
-		executionGraph.cancel();
+			assertEquals(JobStatus.CANCELED, executionGraph.getState());
 
-		assertEquals(JobStatus.CANCELED, executionGraph.getState());
+			// The restart has been aborted
+			executionGraph.restart(executionGraph.getGlobalModVersion());
 
-		// The restart has been aborted
-		executionGraph.restart(executionGraph.getGlobalModVersion());
+			assertEquals(JobStatus.CANCELED, executionGraph.getState());
+		}
 
-		assertEquals(JobStatus.CANCELED, executionGraph.getState());
 	}
 
 	@Test
 	public void testFailWhileRestarting() throws Exception {
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-			new SimpleAckingTaskManagerGateway(),
-			NUM_TASKS);
-
-		scheduler.newInstanceAvailable(instance);
-
-		// Blocking program
-		ExecutionGraph executionGraph = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			new JobID(),
-			"TestJob",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			// We want to manually control the restart and delay
-			new InfiniteDelayRestartStrategy(),
-			scheduler);
-
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-		JobVertex jobVertex = new JobVertex("NoOpInvokable");
-		jobVertex.setInvokableClass(NoOpInvokable.class);
-		jobVertex.setParallelism(NUM_TASKS);
-
-		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
-
-		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			final ExecutionGraph executionGraph = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new InfiniteDelayRestartStrategy())
+				.setTaskManagerLocation(taskManagerLocation)
+				.buildAndScheduleForExecution(slotPool);
 
-		assertEquals(JobStatus.CREATED, executionGraph.getState());
+			// Release the TaskManager and wait for the job to restart
+			slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception"));
 
-		executionGraph.scheduleForExecution();
-
-		assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
-		// Kill the instance and wait for the job to restart
-		instance.markDead();
-
-		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+			assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
-		// If we fail when being in RESTARTING, then we should try to restart again
-		final long globalModVersion = executionGraph.getGlobalModVersion();
-		final Exception testException = new Exception("Test exception");
-		executionGraph.failGlobal(testException);
+			// If we fail when being in RESTARTING, then we should try to restart again
+			final long globalModVersion = executionGraph.getGlobalModVersion();
+			final Exception testException = new Exception("Test exception");
+			executionGraph.failGlobal(testException);
 
-		assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion());
-		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-		assertEquals(testException, executionGraph.getFailureCause()); // we should have updated the failure cause
+			assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion());
+			assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+			assertEquals(testException, executionGraph.getFailureCause()); // we should have updated the failure cause
 
-		// but it should fail when sending a SuppressRestartsException
-		executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart exception")));
+			// but it should fail when sending a SuppressRestartsException
+			executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart exception")));
 
-		assertEquals(JobStatus.FAILED, executionGraph.getState());
+			assertEquals(JobStatus.FAILED, executionGraph.getState());
 
-		// The restart has been aborted
-		executionGraph.restart(executionGraph.getGlobalModVersion());
+			// The restart has been aborted
+			executionGraph.restart(executionGraph.getGlobalModVersion());
 
-		assertEquals(JobStatus.FAILED, executionGraph.getState());
+			assertEquals(JobStatus.FAILED, executionGraph.getState());
+		}
 	}
 
 	@Test
 	public void testCancelWhileFailing() throws Exception {
-		final RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
-		final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new InfiniteDelayRestartStrategy())
+				.buildAndScheduleForExecution(slotPool);
 
-		assertEquals(JobStatus.RUNNING, graph.getState());
+			assertEquals(JobStatus.RUNNING, graph.getState());
 
-		// switch all tasks to running
-		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
-			vertex.getCurrentExecutionAttempt().switchToRunning();
-		}
+			// switch all tasks to running
+			for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) {
+				vertex.getCurrentExecutionAttempt().switchToRunning();
+			}
 
-		graph.failGlobal(new Exception("test"));
+			graph.failGlobal(new Exception("test"));
 
-		assertEquals(JobStatus.FAILING, graph.getState());
+			assertEquals(JobStatus.FAILING, graph.getState());
 
-		graph.cancel();
+			graph.cancel();
 
-		assertEquals(JobStatus.CANCELLING, graph.getState());
+			assertEquals(JobStatus.CANCELLING, graph.getState());
 
-		// let all tasks finish cancelling
-		completeCanceling(graph);
+			// let all tasks finish cancelling
+			completeCanceling(graph);
+
+			assertEquals(JobStatus.CANCELED, graph.getState());
+		}
 
-		assertEquals(JobStatus.CANCELED, graph.getState());
 	}
 
 	@Test
 	public void testFailWhileCanceling() throws Exception {
-		final RestartStrategy restartStrategy = new NoRestartStrategy();
-		final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder().buildAndScheduleForExecution(slotPool);
+
+			assertEquals(JobStatus.RUNNING, graph.getState());
+			switchAllTasksToRunning(graph);
 
-		assertEquals(JobStatus.RUNNING, graph.getState());
-		switchAllTasksToRunning(graph);
+			graph.cancel();
 
-		graph.cancel();
+			assertEquals(JobStatus.CANCELLING, graph.getState());
 
-		assertEquals(JobStatus.CANCELLING, graph.getState());
+			graph.failGlobal(new Exception("test"));
 
-		graph.failGlobal(new Exception("test"));
+			assertEquals(JobStatus.FAILING, graph.getState());
 
-		assertEquals(JobStatus.FAILING, graph.getState());
+			// let all tasks finish cancelling
+			completeCanceling(graph);
 
-		// let all tasks finish cancelling
-		completeCanceling(graph);
+			assertEquals(JobStatus.FAILED, graph.getState());
+		}
 
-		assertEquals(JobStatus.FAILED, graph.getState());
 	}
 
 	private void switchAllTasksToRunning(ExecutionGraph graph) {
@@ -287,23 +270,28 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testNoRestartOnSuppressException() throws Exception {
-		final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0)).f0;
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0))
+				.buildAndScheduleForExecution(slotPool);
 
-		// Fail with unrecoverable Exception
-		eg.getAllExecutionVertices().iterator().next().fail(
-			new SuppressRestartsException(new Exception("Test Exception")));
+			// Fail with unrecoverable Exception
+			eg.getAllExecutionVertices().iterator().next().fail(
+				new SuppressRestartsException(new Exception("Test Exception")));
 
-		assertEquals(JobStatus.FAILING, eg.getState());
+			assertEquals(JobStatus.FAILING, eg.getState());
 
-		completeCanceling(eg);
+			completeCanceling(eg);
 
-		eg.waitUntilTerminal();
-		assertEquals(JobStatus.FAILED, eg.getState());
+			eg.waitUntilTerminal();
+			assertEquals(JobStatus.FAILED, eg.getState());
 
-		RestartStrategy restartStrategy = eg.getRestartStrategy();
-		assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
+			RestartStrategy restartStrategy = eg.getRestartStrategy();
+			assertTrue(restartStrategy instanceof FixedDelayRestartStrategy);
+
+			assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt());
+		}
 
-		assertEquals(0, ((FixedDelayRestartStrategy) restartStrategy).getCurrentRestartAttempt());
 	}
 
 	/**
@@ -313,56 +301,47 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	 */
 	@Test
 	public void testFailingExecutionAfterRestart() throws Exception {
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-			new SimpleAckingTaskManagerGateway(),
-			2);
-
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
-
-		TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting();
-
 		JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
 		JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
-		ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler);
-		eg.start(mainThreadExecutor);
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
-		assertEquals(JobStatus.CREATED, eg.getState());
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(TestRestartStrategy.directExecuting())
+				.setJobGraph(jobGraph)
+				.setNumberOfTasks(2)
+				.buildAndScheduleForExecution(slotPool);
 
-		eg.scheduleForExecution();
-		assertEquals(JobStatus.RUNNING, eg.getState());
+			Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
 
-		Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
+			Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt();
+			Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt();
 
-		Execution finishedExecution = executionVertices.next().getCurrentExecutionAttempt();
-		Execution failedExecution = executionVertices.next().getCurrentExecutionAttempt();
+			finishedExecution.markFinished();
 
-		finishedExecution.markFinished();
+			failedExecution.fail(new Exception("Test Exception"));
+			failedExecution.completeCancelling();
 
-		failedExecution.fail(new Exception("Test Exception"));
-		failedExecution.completeCancelling();
-
-		assertEquals(JobStatus.RUNNING, eg.getState());
+			assertEquals(JobStatus.RUNNING, eg.getState());
 
-		// At this point all resources have been assigned
-		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-			assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource());
-			vertex.getCurrentExecutionAttempt().switchToRunning();
-		}
+			// At this point all resources have been assigned
+			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+				assertNotNull("No assigned resource (test instability).", vertex.getCurrentAssignedResource());
+				vertex.getCurrentExecutionAttempt().switchToRunning();
+			}
 
-		// fail old finished execution, this should not affect the execution
-		finishedExecution.fail(new Exception("This should have no effect"));
+			// fail old finished execution, this should not affect the execution
+			finishedExecution.fail(new Exception("This should have no effect"));
 
-		for (ExecutionVertex vertex: eg.getAllExecutionVertices()) {
-			vertex.getCurrentExecutionAttempt().markFinished();
-		}
+			for (ExecutionVertex vertex: eg.getAllExecutionVertices()) {
+				vertex.getCurrentExecutionAttempt().markFinished();
+			}
 
-		// the state of the finished execution should have not changed since it is terminal
-		assertEquals(ExecutionState.FINISHED, finishedExecution.getState());
+			// the state of the finished execution should have not changed since it is terminal
+			assertEquals(ExecutionState.FINISHED, finishedExecution.getState());
 
-		assertEquals(JobStatus.FINISHED, eg.getState());
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
 	}
 
 	/**
@@ -372,43 +351,27 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	 */
 	@Test
 	public void testFailExecutionAfterCancel() throws Exception {
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-			new SimpleAckingTaskManagerGateway(),
-			2);
-
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
-
-		JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
-
-		ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-			Integer.MAX_VALUE, Integer.MAX_VALUE));
-		JobGraph jobGraph = new JobGraph("Test Job", vertex);
-		jobGraph.setExecutionConfig(executionConfig);
-
-		ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new InfiniteDelayRestartStrategy())
+				.setJobGraph(createJobGraphToCancel())
+				.setNumberOfTasks(2)
+				.buildAndScheduleForExecution(slotPool);
 
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			// Fail right after cancel (for example with concurrent slot release)
+			eg.cancel();
 
-		assertEquals(JobStatus.CREATED, eg.getState());
+			for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+				v.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+			}
 
-		eg.scheduleForExecution();
-		assertEquals(JobStatus.RUNNING, eg.getState());
+			assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
 
-		// Fail right after cancel (for example with concurrent slot release)
-		eg.cancel();
+			Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
 
-		for (ExecutionVertex v : eg.getAllExecutionVertices()) {
-			v.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+			execution.completeCancelling();
+			assertEquals(JobStatus.CANCELED, eg.getState());
 		}
-
-		assertEquals(JobStatus.CANCELED, eg.getTerminationFuture().get());
-
-		Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
-		execution.completeCancelling();
-		assertEquals(JobStatus.CANCELED, eg.getState());
 	}
 
 	/**
@@ -417,41 +380,25 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	 */
 	@Test
 	public void testFailExecutionGraphAfterCancel() throws Exception {
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-			new SimpleAckingTaskManagerGateway(),
-			2);
-
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
-
-		JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
-
-		ExecutionConfig executionConfig = new ExecutionConfig();
-		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-			Integer.MAX_VALUE, Integer.MAX_VALUE));
-		JobGraph jobGraph = new JobGraph("Test Job", vertex);
-		jobGraph.setExecutionConfig(executionConfig);
-
-		ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(new InfiniteDelayRestartStrategy())
+				.setJobGraph(createJobGraphToCancel())
+				.setNumberOfTasks(2)
+				.buildAndScheduleForExecution(slotPool);
 
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+			// Fail right after cancel (for example with concurrent slot release)
+			eg.cancel();
+			assertEquals(JobStatus.CANCELLING, eg.getState());
 
-		assertEquals(JobStatus.CREATED, eg.getState());
+			eg.failGlobal(new Exception("Test Exception"));
+			assertEquals(JobStatus.FAILING, eg.getState());
 
-		eg.scheduleForExecution();
-		assertEquals(JobStatus.RUNNING, eg.getState());
-
-		// Fail right after cancel (for example with concurrent slot release)
-		eg.cancel();
-		assertEquals(JobStatus.CANCELLING, eg.getState());
-
-		eg.failGlobal(new Exception("Test Exception"));
-		assertEquals(JobStatus.FAILING, eg.getState());
+			Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
 
-		Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
-
-		execution.completeCancelling();
-		assertEquals(JobStatus.RESTARTING, eg.getState());
+			execution.completeCancelling();
+			assertEquals(JobStatus.RESTARTING, eg.getState());
+		}
 	}
 
 	/**
@@ -459,55 +406,29 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	 */
 	@Test
 	public void testSuspendWhileRestarting() throws Exception {
-
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-			new SimpleAckingTaskManagerGateway(),
-			NUM_TASKS);
-
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
-
-		JobVertex sender = new JobVertex("Task");
-		sender.setInvokableClass(NoOpInvokable.class);
-		sender.setParallelism(NUM_TASKS);
-
-		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
 		TestRestartStrategy controllableRestartStrategy = TestRestartStrategy.manuallyTriggered();
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+			ExecutionGraph eg = TestingExecutionGraphBuilder.newBuilder()
+				.setRestartStrategy(controllableRestartStrategy)
+				.setTaskManagerLocation(taskManagerLocation)
+				.buildAndScheduleForExecution(slotPool);
 
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutor(),
-			TestingUtils.defaultExecutor(),
-			new JobID(),
-			"Test job",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			controllableRestartStrategy,
-			scheduler);
-
-		eg.start(mainThreadExecutor);
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-		assertEquals(JobStatus.CREATED, eg.getState());
-
-		eg.scheduleForExecution();
-
-		assertEquals(JobStatus.RUNNING, eg.getState());
-
-		instance.markDead();
+			// Release the TaskManager and wait for the job to restart
+			slotPool.releaseTaskManager(taskManagerLocation.getResourceID(), new Exception("Test Exception"));
 
-		Assert.assertEquals(1, controllableRestartStrategy.getNumberOfQueuedActions());
+			assertEquals(1, controllableRestartStrategy.getNumberOfQueuedActions());
 
-		assertEquals(JobStatus.RESTARTING, eg.getState());
+			assertEquals(JobStatus.RESTARTING, eg.getState());
 
-		eg.suspend(new Exception("Test exception"));
+			eg.suspend(new Exception("Test exception"));
 
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
+			assertEquals(JobStatus.SUSPENDED, eg.getState());
 
-		controllableRestartStrategy.triggerAll().join();
+			controllableRestartStrategy.triggerAll().join();
 
-		assertEquals(JobStatus.SUSPENDED, eg.getState());
+			assertEquals(JobStatus.SUSPENDED, eg.getState());
+		}
 	}
 
 	@Test
@@ -518,7 +439,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		final TestRestartStrategy triggeredRestartStrategy = TestRestartStrategy.manuallyTriggered();
 
 		final ExecutionGraph eg = createSimpleTestGraph(
-			new JobID(),
+			TEST_JOB_ID,
 			taskManagerGateway,
 			triggeredRestartStrategy,
 			createNoOpVertex(parallelism));
@@ -538,15 +459,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		first.fail(new Exception("intended test failure 1"));
 		last.fail(new Exception("intended test failure 2"));
 
-		Assert.assertEquals(JobStatus.FAILING, eg.getState());
+		assertEquals(JobStatus.FAILING, eg.getState());
 
 		completeCancellingForAllVertices(eg);
 
 		// Now trigger the restart
-		Assert.assertEquals(1, triggeredRestartStrategy.getNumberOfQueuedActions());
+		assertEquals(1, triggeredRestartStrategy.getNumberOfQueuedActions());
 		triggeredRestartStrategy.triggerAll().join();
 
-		Assert.assertEquals(JobStatus.RUNNING, eg.getState());
+		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		switchToRunning(eg);
 		finishAllVertices(eg);
@@ -558,13 +479,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	@Test
 	public void testGlobalFailAndRestarts() throws Exception {
 		final int parallelism = 10;
-		final JobID jid = new JobID();
 		final JobVertex vertex = createNoOpVertex(parallelism);
 		final NotCancelAckingTaskGateway taskManagerGateway = new NotCancelAckingTaskGateway();
-		final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, taskManagerGateway);
+		final SlotProvider slots = new SimpleSlotProvider(TEST_JOB_ID, parallelism, taskManagerGateway);
 		final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered();
 
-		final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
+		final ExecutionGraph eg = createSimpleTestGraph(TEST_JOB_ID, slots, restartStrategy, vertex);
 		eg.start(mainThreadExecutor);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
@@ -603,55 +523,57 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		// this test is inconclusive if not used with a proper multi-threaded executor
 		assertTrue("test assumptions violated", ((ThreadPoolExecutor) executor).getCorePoolSize() > 1);
 
-		SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
 		final int parallelism = 20;
-		final Scheduler scheduler = createSchedulerWithInstances(parallelism, taskManagerGateway);
 
-		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			final Scheduler scheduler = createSchedulerWithSlots(parallelism, slotPool, new LocalTaskManagerLocation());
 
-		final JobVertex source = new JobVertex("source");
-		source.setInvokableClass(NoOpInvokable.class);
-		source.setParallelism(parallelism);
-		source.setSlotSharingGroup(sharingGroup);
+			final SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
-		final JobVertex sink = new JobVertex("sink");
-		sink.setInvokableClass(NoOpInvokable.class);
-		sink.setParallelism(parallelism);
-		sink.setSlotSharingGroup(sharingGroup);
-		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+			final JobVertex source = new JobVertex("source");
+			source.setInvokableClass(NoOpInvokable.class);
+			source.setParallelism(parallelism);
+			source.setSlotSharingGroup(sharingGroup);
 
-		TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting();
+			final JobVertex sink = new JobVertex("sink");
+			sink.setInvokableClass(NoOpInvokable.class);
+			sink.setParallelism(parallelism);
+			sink.setSlotSharingGroup(sharingGroup);
+			sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
 
-		final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
-			new JobID(),
-			scheduler,
-			restartStrategy,
-			executor,
-			source,
-			sink);
+			TestRestartStrategy restartStrategy = TestRestartStrategy.directExecuting();
 
-		eg.start(mainThreadExecutor);
+			final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+				TEST_JOB_ID,
+				scheduler,
+				restartStrategy,
+				executor,
+				source,
+				sink);
 
-		eg.setScheduleMode(ScheduleMode.EAGER);
-		eg.scheduleForExecution();
+			eg.start(mainThreadExecutor);
 
-		switchToRunning(eg);
+			eg.setScheduleMode(ScheduleMode.EAGER);
+			eg.scheduleForExecution();
 
-		// fail into 'RESTARTING'
-		eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
-			new Exception("intended test failure"));
+			switchToRunning(eg);
 
-		assertEquals(JobStatus.FAILING, eg.getState());
+			// fail into 'RESTARTING'
+			eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt().fail(
+				new Exception("intended test failure"));
 
-		completeCancellingForAllVertices(eg);
+			assertEquals(JobStatus.FAILING, eg.getState());
 
-		assertEquals(JobStatus.RUNNING, eg.getState());
+			completeCancellingForAllVertices(eg);
 
-		// clean termination
-		switchToRunning(eg);
-		finishAllVertices(eg);
+			assertEquals(JobStatus.RUNNING, eg.getState());
 
-		assertEquals(JobStatus.FINISHED, eg.getState());
+			// clean termination
+			switchToRunning(eg);
+			finishAllVertices(eg);
+
+			assertEquals(JobStatus.FINISHED, eg.getState());
+		}
 	}
 
 	@Test
@@ -662,43 +584,44 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		final int numRestarts = 10;
 		final int parallelism = 20;
 
-		TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
-		final Scheduler scheduler = createSchedulerWithInstances(parallelism - 1, taskManagerGateway);
-
-		final SlotSharingGroup sharingGroup = new SlotSharingGroup();
+		try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) {
+			final Scheduler scheduler = createSchedulerWithSlots(
+				parallelism - 1, slotPool, new LocalTaskManagerLocation());
 
-		final JobVertex source = new JobVertex("source");
-		source.setInvokableClass(NoOpInvokable.class);
-		source.setParallelism(parallelism);
-		source.setSlotSharingGroup(sharingGroup);
+			final SlotSharingGroup sharingGroup = new SlotSharingGroup();
 
-		final JobVertex sink = new JobVertex("sink");
-		sink.setInvokableClass(NoOpInvokable.class);
-		sink.setParallelism(parallelism);
-		sink.setSlotSharingGroup(sharingGroup);
-		sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
+			final JobVertex source = new JobVertex("source");
+			source.setInvokableClass(NoOpInvokable.class);
+			source.setParallelism(parallelism);
+			source.setSlotSharingGroup(sharingGroup);
 
-		TestRestartStrategy restartStrategy =
-			new TestRestartStrategy(numRestarts, false);
+			final JobVertex sink = new JobVertex("sink");
+			sink.setInvokableClass(NoOpInvokable.class);
+			sink.setParallelism(parallelism);
+			sink.setSlotSharingGroup(sharingGroup);
+			sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
 
+			TestRestartStrategy restartStrategy =
+				new TestRestartStrategy(numRestarts, false);
 
-		final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
-			new JobID(), scheduler, restartStrategy, executor, source, sink);
+			final ExecutionGraph eg = ExecutionGraphTestUtils.createExecutionGraph(
+				TEST_JOB_ID, scheduler, restartStrategy, executor, source, sink);
 
-		eg.start(mainThreadExecutor);
-		eg.setScheduleMode(ScheduleMode.EAGER);
-		eg.scheduleForExecution();
+			eg.start(mainThreadExecutor);
+			eg.setScheduleMode(ScheduleMode.EAGER);
+			eg.scheduleForExecution();
 
-		// wait until no more changes happen
-		while (eg.getNumberOfFullRestarts() < numRestarts) {
-			Thread.sleep(1);
-		}
+			// wait until no more changes happen
+			while (eg.getNumberOfFullRestarts() < numRestarts) {
+				Thread.sleep(1);
+			}
 
-		assertEquals(JobStatus.FAILED, eg.getState());
+			assertEquals(JobStatus.FAILED, eg.getState());
 
-		final Throwable t = eg.getFailureCause();
-		if (!(t instanceof NoResourceAvailableException)) {
-			ExceptionUtils.rethrowException(t, t.getMessage());
+			final Throwable t = eg.getFailureCause();
+			if (!(t instanceof NoResourceAvailableException)) {
+				ExceptionUtils.rethrowException(t, t.getMessage());
+			}
 		}
 	}
 
@@ -710,7 +633,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	public void testFailureWhileRestarting() throws Exception {
 
 		final TestRestartStrategy restartStrategy = TestRestartStrategy.manuallyTriggered();
-		final ExecutionGraph executionGraph = createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture<>()));
+		final ExecutionGraph executionGraph = createSimpleExecutionGraph(
+			restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture<>()), createJobGraph());
 
 		executionGraph.start(mainThreadExecutor);
 		executionGraph.setQueuedSchedulingAllowed(true);
@@ -733,66 +657,100 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private Scheduler createSchedulerWithInstances(int num, TaskManagerGateway taskManagerGateway) {
-		final Scheduler scheduler = new Scheduler(executor);
-		final Instance[] instances = new Instance[num];
+	private static class TestingExecutionGraphBuilder {
+		private RestartStrategy restartStrategy = new NoRestartStrategy();
+		private JobGraph jobGraph = createJobGraph();
+		private int tasksNum = NUM_TASKS;
+		private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
 
-		for (int i = 0; i < instances.length; i++) {
-			instances[i] = createInstance(taskManagerGateway, 55443 + i);
-			scheduler.newInstanceAvailable(instances[i]);
+		private TestingExecutionGraphBuilder setRestartStrategy(RestartStrategy restartStrategy) {
+			this.restartStrategy = restartStrategy;
+			return this;
 		}
 
-		return scheduler;
-	}
+		private TestingExecutionGraphBuilder setJobGraph(JobGraph jobGraph) {
+			this.jobGraph = jobGraph;
+			return this;
+		}
 
-	private static Instance createInstance(TaskManagerGateway taskManagerGateway, int port) {
-		final HardwareDescription resources = new HardwareDescription(4, 1_000_000_000, 500_000_000, 400_000_000);
-		final TaskManagerLocation location = new TaskManagerLocation(
-			ResourceID.generate(), InetAddress.getLoopbackAddress(), port);
-		return new Instance(taskManagerGateway, location, new InstanceID(), resources, 1);
-	}
+		TestingExecutionGraphBuilder setNumberOfTasks(@SuppressWarnings("SameParameterValue") int tasksNum) {
+			this.tasksNum = tasksNum;
+			return this;
+		}
 
-	// ------------------------------------------------------------------------
+		private TestingExecutionGraphBuilder setTaskManagerLocation(TaskManagerLocation taskManagerLocation) {
+			this.taskManagerLocation = taskManagerLocation;
+			return this;
+		}
 
-	private Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-			new SimpleAckingTaskManagerGateway(),
-			NUM_TASKS);
+		private static TestingExecutionGraphBuilder newBuilder() {
+			return new TestingExecutionGraphBuilder();
+		}
 
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
+		private ExecutionGraph buildAndScheduleForExecution(SlotPool slotPool) throws Exception {
+			final Scheduler scheduler = createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation);
+			final ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph);
 
-		ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler);
+			assertEquals(JobStatus.CREATED, eg.getState());
 
-		assertEquals(JobStatus.CREATED, eg.getState());
+			eg.scheduleForExecution();
+			assertEquals(JobStatus.RUNNING, eg.getState());
 
-		eg.scheduleForExecution();
-		assertEquals(JobStatus.RUNNING, eg.getState());
-		return new Tuple2<>(eg, instance);
+			return eg;
+		}
 	}
 
-	private ExecutionGraph createSimpleExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException, JobException {
-		JobGraph jobGraph = createJobGraph(NUM_TASKS);
+	private static Scheduler createSchedulerWithSlots(
+		int numSlots, SlotPool slotPool, TaskManagerLocation taskManagerLocation) throws Exception {
 
-		ExecutionGraph eg = newExecutionGraph(restartStrategy, slotProvider);
-		eg.start(mainThreadExecutor);
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+		final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		setupSlotPool(slotPool);
+		Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool);
+		scheduler.start(mainThreadExecutor);
+		slotPool.registerTaskManager(taskManagerLocation.getResourceID());
 
-		return eg;
+		final List<SlotOffer> slotOffers = new ArrayList<>(NUM_TASKS);
+		for (int i = 0; i < numSlots; i++) {
+			final AllocationID allocationId = new AllocationID();
+			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+			slotOffers.add(slotOffer);
+		}
+
+		slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
+
+		return scheduler;
 	}
 
-	@Nonnull
-	private static JobGraph createJobGraph(int parallelism) {
-		JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", parallelism, NoOpInvokable.class);
+	private static void setupSlotPool(SlotPool slotPool) throws Exception {
+		final String jobManagerAddress = "foobar";
+		final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		slotPool.start(JobMasterId.generate(), jobManagerAddress, mainThreadExecutor);
+		slotPool.connectToResourceManager(resourceManagerGateway);
+	}
 
+	private static JobGraph createJobGraph() {
+		JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
 		return new JobGraph("Pointwise job", sender);
 	}
 
-	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
-		final ExecutionGraph executionGraph = new ExecutionGraph(
+	private static JobGraph createJobGraphToCancel() throws IOException {
+		JobVertex vertex = ExecutionGraphTestUtils.createJobVertex("Test Vertex", 1, NoOpInvokable.class);
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
+			Integer.MAX_VALUE, Integer.MAX_VALUE));
+		JobGraph jobGraph = new JobGraph("Test Job", vertex);
+		jobGraph.setExecutionConfig(executionConfig);
+		return jobGraph;
+	}
+
+	private static ExecutionGraph createSimpleExecutionGraph(
+		RestartStrategy restartStrategy, SlotProvider slotProvider, JobGraph jobGraph)
+		throws IOException, JobException {
+
+		ExecutionGraph executionGraph = new ExecutionGraph(
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
-			new JobID(),
+			TEST_JOB_ID,
 			"Test job",
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
@@ -800,12 +758,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			restartStrategy,
 			slotProvider);
 
-		executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+		executionGraph.start(mainThreadExecutor);
+		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		return executionGraph;
 	}
 
-	private void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) {
+	private void restartAfterFailure(ExecutionGraph eg) {
 		eg.start(mainThreadExecutor);
 		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
 
@@ -817,12 +776,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
-		if (haltAfterRestart) {
-			haltExecution(eg);
-		}
-	}
-
-	private static void haltExecution(ExecutionGraph eg) {
 		finishAllVertices(eg);
 		assertEquals(JobStatus.FINISHED, eg.getState());
 	}