You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/03/02 11:24:22 UTC

[2/2] flink git commit: [FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor

[FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor

Before the scheduler was set when calling ExecutionGraph.scheduleForExecution(). This
has the disadvantage that the ExecutionGraph has not scheduler set if something else
went wrong before the scheduleForExecution call. Consequently, the job will be stuck
in a restart loop because the recovery will fail if there is no Scheduler set. In
order to solve the problem, the Scheduler is not passed to the ExecutionGraph when
it is created.

This closes #3437.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c968563
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c968563
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c968563

Branch: refs/heads/master
Commit: 5c968563df26642f81ca94df391f31c51b4f37e6
Parents: cde3cdd
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Feb 28 15:20:47 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Mar 2 12:23:10 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  64 ++++---
 .../executiongraph/ExecutionGraphBuilder.java   |   3 +
 .../flink/runtime/jobmaster/JobMaster.java      |  33 ++--
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   2 +
 .../ArchivedExecutionGraphTest.java             |   5 +-
 .../ExecutionGraphConstructionTest.java         |  25 ++-
 .../ExecutionGraphDeploymentTest.java           |  47 ++---
 .../ExecutionGraphMetricsTest.java              |   3 +-
 .../ExecutionGraphRestartTest.java              |  39 +++--
 .../ExecutionGraphSchedulingTest.java           | 173 +++++++++----------
 .../ExecutionGraphSignalsTest.java              |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |   5 +-
 .../ExecutionStateProgressTest.java             |   4 +-
 .../ExecutionVertexLocalityTest.java            |  26 +--
 .../executiongraph/LegacyJobVertexIdTest.java   |  20 ++-
 .../executiongraph/PointwisePatternTest.java    |  22 ++-
 .../TerminalStateDeadlockTest.java              |   5 +-
 .../executiongraph/VertexSlotSharingTest.java   |   4 +-
 .../TaskManagerLossFailsTasksTest.scala         |   5 +-
 .../partitioner/RescalePartitionerTest.java     |   2 +
 21 files changed, 274 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a76a421..5fa40fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -141,6 +141,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 */
 	private final SerializedValue<JobInformation> serializedJobInformation;
 
+	/** The executor which is used to execute futures. */
+	private final ScheduledExecutorService futureExecutor;
+
+	/** The executor which is used to execute blocking io operations */
+	private final Executor ioExecutor;
+
 	/** {@code true} if all source tasks are stoppable. */
 	private boolean isStoppable = true;
 
@@ -172,6 +178,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** The timeout for all messages that require a response/acknowledgement */
 	private final Time rpcCallTimeout;
 
+	/** Strategy to use for restarts */
+	private final RestartStrategy restartStrategy;
+
+	/** The slot provider to use for allocating slots for tasks as they are needed */
+	private final SlotProvider slotProvider;
+
+	/** The classloader for the user code. Needed for calls into user code classes */
+	private final ClassLoader userClassLoader;
+
+	/** Registered KvState instances reported by the TaskManagers. */
+	private final KvStateLocationRegistry kvStateLocationRegistry;
+
 	// ------ Configuration of the Execution -------
 
 	/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -199,31 +217,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 	// ------ Fields that are relevant to the execution and need to be cleared before archiving  -------
 
-	/** The slot provider to use for allocating slots for tasks as they are needed */
-	private SlotProvider slotProvider;
-
-	/** Strategy to use for restarts */
-	private RestartStrategy restartStrategy;
-
-	/** The classloader for the user code. Needed for calls into user code classes */
-	private ClassLoader userClassLoader;
-
 	/** The coordinator for checkpoints, if snapshot checkpoints are enabled */
 	private CheckpointCoordinator checkpointCoordinator;
 
 	/** Checkpoint stats tracker separate from the coordinator in order to be
 	 * available after archiving. */
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
 	private CheckpointStatsTracker checkpointStatsTracker;
 
-	/** The executor which is used to execute futures. */
-	private final ScheduledExecutorService futureExecutor;
-
-	/** The executor which is used to execute blocking io operations */
-	private final Executor ioExecutor;
-
-	/** Registered KvState instances reported by the TaskManagers. */
-	private KvStateLocationRegistry kvStateLocationRegistry;
-
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
@@ -242,7 +243,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			Configuration jobConfig,
 			SerializedValue<ExecutionConfig> serializedConfig,
 			Time timeout,
-			RestartStrategy restartStrategy) throws IOException {
+			RestartStrategy restartStrategy,
+			SlotProvider slotProvider) throws IOException {
 		this(
 			futureExecutor,
 			ioExecutor,
@@ -254,6 +256,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			restartStrategy,
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
+			slotProvider,
 			ExecutionGraph.class.getClassLoader(),
 			new UnregisteredMetricsGroup()
 		);
@@ -270,6 +273,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			RestartStrategy restartStrategy,
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
+			SlotProvider slotProvider,
 			ClassLoader userClassLoader,
 			MetricGroup metricGroup) throws IOException {
 
@@ -277,7 +281,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		checkNotNull(jobId);
 		checkNotNull(jobName);
 		checkNotNull(jobConfig);
-		checkNotNull(userClassLoader);
 
 		this.jobInformation = new JobInformation(
 			jobId,
@@ -293,12 +296,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
 		this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
-		this.userClassLoader = userClassLoader;
+		this.slotProvider = Preconditions.checkNotNull(slotProvider, "scheduler");
+		this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader");
 
-		this.tasks = new ConcurrentHashMap<JobVertexID, ExecutionJobVertex>();
-		this.intermediateResults = new ConcurrentHashMap<IntermediateDataSetID, IntermediateResult>();
-		this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
-		this.currentExecutions = new ConcurrentHashMap<ExecutionAttemptID, Execution>();
+		this.tasks = new ConcurrentHashMap<>(16);
+		this.intermediateResults = new ConcurrentHashMap<>(16);
+		this.verticesInCreationOrder = new ArrayList<>(16);
+		this.currentExecutions = new ConcurrentHashMap<>(16);
 
 		this.jobStatusListeners  = new CopyOnWriteArrayList<>();
 		this.executionListeners = new CopyOnWriteArrayList<>();
@@ -732,15 +736,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
-		checkNotNull(slotProvider);
-
-		if (this.slotProvider != null && this.slotProvider != slotProvider) {
-			throw new IllegalArgumentException("Cannot use different slot providers for the same job");
-		}
+	public void scheduleForExecution() throws JobException {
 
 		if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
-			this.slotProvider = slotProvider;
 
 			switch (scheduleMode) {
 
@@ -1070,7 +1068,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 			}
 
-			scheduleForExecution(slotProvider);
+			scheduleForExecution();
 		}
 		catch (Throwable t) {
 			LOG.warn("Failed to restart the job.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 2a79302..ec7103c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -68,6 +69,7 @@ public class ExecutionGraphBuilder {
 			Configuration jobManagerConfig,
 			ScheduledExecutorService futureExecutor,
 			Executor ioExecutor,
+			SlotProvider slotProvider,
 			ClassLoader classLoader,
 			CheckpointRecoveryFactory recoveryFactory,
 			Time timeout,
@@ -98,6 +100,7 @@ public class ExecutionGraphBuilder {
 						restartStrategy,
 						jobGraph.getUserJarBlobKeys(),
 						jobGraph.getClasspaths(),
+						slotProvider,
 						classLoader,
 						metrics);
 		} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 941248f..145216d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -232,26 +232,27 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
+		this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
+		this.slotPoolGateway = slotPool.getSelf();
+
 		this.executionGraph = ExecutionGraphBuilder.buildGraph(
-				null,
-				jobGraph,
-				configuration,
-				executorService,
-				executorService,
-				userCodeLoader,
-				checkpointRecoveryFactory,
-				rpcAskTimeout,
-				restartStrategy,
-				jobMetricGroup,
-				-1,
-				log);
+			null,
+			jobGraph,
+			configuration,
+			executorService,
+			executorService,
+			slotPool.getSlotProvider(),
+			userCodeLoader,
+			checkpointRecoveryFactory,
+			rpcAskTimeout,
+			restartStrategy,
+			jobMetricGroup,
+			-1,
+			log);
 
 		// register self as job status change listener
 		executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
 
-		this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
-		this.slotPoolGateway = slotPool.getSelf();
-
 		this.registeredTaskManagers = new HashMap<>(4);
 	}
 
@@ -340,7 +341,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			@Override
 			public void run() {
 				try {
-					executionGraph.scheduleForExecution(slotPool.getSlotProvider());
+					executionGraph.scheduleForExecution();
 				}
 				catch (Throwable t) {
 					executionGraph.fail(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 87cd4ac..9d53aa2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1292,6 +1292,7 @@ class JobManager(
           flinkConfiguration,
           futureExecutor,
           ioExecutor,
+          scheduler,
           userCodeLoader,
           checkpointRecoveryFactory,
           Time.of(timeout.length, timeout.unit),
@@ -1410,7 +1411,7 @@ class JobManager(
             // the job.
             log.info(s"Scheduling job $jobId ($jobName).")
 
-            executionGraph.scheduleForExecution(scheduler)
+            executionGraph.scheduleForExecution()
           } else {
             // Remove the job graph. Otherwise it will be lingering around and possibly removed from
             // ZooKeeper by this JM.

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 8f565dd..98b4c4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.junit.AfterClass;
@@ -103,6 +104,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			new NoRestartStrategy(),
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ClassLoader.getSystemClassLoader(),
 			new UnregisteredMetricsGroup());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index b3e9d5d..077ab53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -103,7 +104,9 @@ public class ArchivedExecutionGraphTest {
 			new Configuration(),
 			new SerializedValue<>(config),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			mock(SlotProvider.class));
+
 		runtimeGraph.attachJobGraph(vertices);
 
 		List<ExecutionJobVertex> jobVertices = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index aed1095..fa48384 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -119,7 +120,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -169,7 +171,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -244,7 +247,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -504,7 +508,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -569,7 +574,8 @@ public class ExecutionGraphConstructionTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 			fail("Attached wrong jobgraph");
@@ -638,7 +644,8 @@ public class ExecutionGraphConstructionTest {
 				cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			try {
 				eg.attachJobGraph(ordered);
 			}
@@ -685,7 +692,8 @@ public class ExecutionGraphConstructionTest {
 				cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 
 			try {
 				eg.attachJobGraph(ordered);
@@ -767,7 +775,8 @@ public class ExecutionGraphConstructionTest {
 				cfg,
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			
 			eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 6b05987..f119671 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -97,7 +97,8 @@ public class ExecutionGraphDeploymentTest {
 				new Configuration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 
 			List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
 
@@ -312,6 +313,15 @@ public class ExecutionGraphDeploymentTest {
 
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
 
+		Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
+		for (int i = 0; i < dop1; i++) {
+			scheduler.newInstanceAvailable(
+				ExecutionGraphTestUtils.getInstance(
+					new ActorTaskManagerGateway(
+						new ExecutionGraphTestUtils.SimpleActorGateway(
+							TestingUtils.directExecutionContext()))));
+		}
+
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
 			new DirectScheduledExecutorService(),
@@ -321,25 +331,18 @@ public class ExecutionGraphDeploymentTest {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			scheduler);
 
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
-		Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
-		for (int i = 0; i < dop1; i++) {
-			scheduler.newInstanceAvailable(
-				ExecutionGraphTestUtils.getInstance(
-					new ActorTaskManagerGateway(
-						new ExecutionGraphTestUtils.SimpleActorGateway(
-							TestingUtils.directExecutionContext()))));
-		}
 		assertEquals(dop1, scheduler.getNumberOfAvailableSlots());
 
 		// schedule, this triggers mock deployment
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 
 		ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
 		eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING));
@@ -357,6 +360,15 @@ public class ExecutionGraphDeploymentTest {
 		v1.setInvokableClass(BatchTask.class);
 		v2.setInvokableClass(BatchTask.class);
 
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		for (int i = 0; i < dop1 + dop2; i++) {
+			scheduler.newInstanceAvailable(
+				ExecutionGraphTestUtils.getInstance(
+					new ActorTaskManagerGateway(
+						new ExecutionGraphTestUtils.SimpleActorGateway(
+							TestingUtils.directExecutionContext()))));
+		}
+
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
 			new DirectScheduledExecutorService(),
@@ -366,25 +378,18 @@ public class ExecutionGraphDeploymentTest {
 			new Configuration(), 
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			scheduler);
 		
 		eg.setQueuedSchedulingAllowed(false);
 
 		List<JobVertex> ordered = Arrays.asList(v1, v2);
 		eg.attachJobGraph(ordered);
 
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		for (int i = 0; i < dop1 + dop2; i++) {
-			scheduler.newInstanceAvailable(
-				ExecutionGraphTestUtils.getInstance(
-					new ActorTaskManagerGateway(
-						new ExecutionGraphTestUtils.SimpleActorGateway(
-							TestingUtils.directExecutionContext()))));
-		}
 		assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots());
 
 		// schedule, this triggers mock deployment
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 
 		Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
 		assertEquals(dop1 + dop2, executions.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 6b28984..203c547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -162,6 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 				testingRestartStrategy,
 				Collections.<BlobKey>emptyList(),
 				Collections.<URL>emptyList(),
+			scheduler,
 				getClass().getClassLoader(),
 				metricGroup);
 	
@@ -180,7 +181,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 	
 			// start execution
-			executionGraph.scheduleForExecution(scheduler);
+		executionGraph.scheduleForExecution();
 
 			assertTrue(0L == restartingTime.getValue());
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index e4f49bb..1729582 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -116,12 +116,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		
 		//initiate and schedule job
 		JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2);
-		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L), scheduler);
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 		
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 		
 		//sanity checks
@@ -240,7 +240,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			// We want to manually control the restart and delay
-			new InfiniteDelayRestartStrategy());
+			new InfiniteDelayRestartStrategy(),
+			scheduler);
 
 		JobVertex jobVertex = new JobVertex("NoOpInvokable");
 		jobVertex.setInvokableClass(NoOpInvokable.class);
@@ -252,7 +253,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.CREATED, executionGraph.getState());
 
-		executionGraph.scheduleForExecution(scheduler);
+		executionGraph.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, executionGraph.getState());
 
@@ -384,12 +385,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobVertex sender = newJobVertex("Task1", 1, NoOpInvokable.class);
 		JobVertex receiver = newJobVertex("Task2", 1, NoOpInvokable.class);
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
-		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000), scheduler);
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		Iterator<ExecutionVertex> executionVertices = eg.getAllExecutionVertices().iterator();
@@ -453,13 +454,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobGraph jobGraph = new JobGraph("Test Job", vertex);
 		jobGraph.setExecutionConfig(executionConfig);
 
-		ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy());
+		ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		// Fail right after cancel (for example with concurrent slot release)
@@ -499,13 +500,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobGraph jobGraph = new JobGraph("Test Job", vertex);
 		jobGraph.setExecutionConfig(executionConfig);
 
-		ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy());
+		ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler);
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		// Fail right after cancel (for example with concurrent slot release)
@@ -555,13 +556,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			controllableRestartStrategy);
+			controllableRestartStrategy,
+			scheduler);
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
@@ -660,7 +662,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
-		ExecutionGraph eg = newExecutionGraph(restartStrategy);
+		ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler);
 		if (isSpy) {
 			eg = spy(eg);
 		}
@@ -668,7 +670,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
-		eg.scheduleForExecution(scheduler);
+		eg.scheduleForExecution();
 		assertEquals(JobStatus.RUNNING, eg.getState());
 		return new Tuple2<>(eg, instance);
 	}
@@ -680,7 +682,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		return groupVertex;
 	}
 
-	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
+	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, Scheduler scheduler) throws IOException {
 		return new ExecutionGraph(
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
@@ -689,7 +691,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			restartStrategy);
+			restartStrategy,
+			scheduler);
+	}
+
+	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
+		return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext()));
 	}
 
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 9834dc6..c8a0422 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -109,9 +109,15 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
-		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+		final FlinkCompletableFuture<SimpleSlot> sourceFuture = new FlinkCompletableFuture<>();
+		final FlinkCompletableFuture<SimpleSlot> targetFuture = new FlinkCompletableFuture<>();
+
+		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+		slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
+		slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
 
-		//
 		//  set up two TaskManager gateways and slots
 
 		final TaskManagerGateway gatewaySource = createTaskManager();
@@ -120,16 +126,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
 		final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
 
-		final FlinkCompletableFuture<SimpleSlot> sourceFuture = new FlinkCompletableFuture<>();
-		final FlinkCompletableFuture<SimpleSlot> targetFuture = new FlinkCompletableFuture<>();
-
-		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
-		slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
-		slotProvider.addSlot(targetVertex.getID(), 0, targetFuture);
-
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
-		eg.scheduleForExecution(slotProvider);
+		eg.scheduleForExecution();
 
 		// job should be running
 		assertEquals(JobStatus.RUNNING, eg.getState());
@@ -177,7 +176,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
-		final ExecutionGraph eg = createExecutionGraph(jobGraph);
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
 
 		//
 		//  Create the slots, futures, and the slot provider
@@ -188,11 +190,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
 
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
-		@SuppressWarnings({"unchecked", "rawtypes"})
-		final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
-
 		for (int i = 0; i < parallelism; i++) {
 			sourceTaskManagers[i] = createTaskManager();
 			targetTaskManagers[i] = createTaskManager();
@@ -208,6 +205,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
 		slotProvider.addSlots(targetVertex.getID(), targetFutures);
 
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
+
 		//
 		//  we complete some of the futures
 
@@ -220,7 +219,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
-		eg.scheduleForExecution(slotProvider);
+		eg.scheduleForExecution();
 
 		verifyNothingDeployed(eg, sourceTaskManagers);
 
@@ -274,10 +273,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
-		final ExecutionGraph eg = createExecutionGraph(jobGraph);
-		TerminalJobStatusListener testListener = new TerminalJobStatusListener();
-		eg.registerJobStatusListener(testListener);
-
 		//
 		//  Create the slots, futures, and the slot provider
 
@@ -304,6 +299,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		slotProvider.addSlots(sourceVertex.getID(), sourceFutures);
 		slotProvider.addSlots(targetVertex.getID(), targetFutures);
 
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
+		TerminalJobStatusListener testListener = new TerminalJobStatusListener();
+		eg.registerJobStatusListener(testListener);
+
 		//
 		//  we complete some of the futures
 
@@ -317,7 +316,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
-		eg.scheduleForExecution(slotProvider);
+		eg.scheduleForExecution();
 
 		// fail one slot
 		sourceFutures[1].completeExceptionally(new TestRuntimeException());
@@ -356,10 +355,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
 
-		final ExecutionGraph eg = createExecutionGraph(jobGraph, Time.milliseconds(20));
-		final TerminalJobStatusListener statusListener = new TerminalJobStatusListener();
-		eg.registerJobStatusListener(statusListener);
-
 		final SlotOwner slotOwner = mock(SlotOwner.class);
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
@@ -375,6 +370,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
 		slotProvider.addSlots(vertex.getID(), slotFutures);
 
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider, Time.milliseconds(20));
+		final TerminalJobStatusListener statusListener = new TerminalJobStatusListener();
+		eg.registerJobStatusListener(statusListener);
+
 		//  we complete one future
 		slotFutures[1].complete(slots[1]);
 
@@ -382,7 +381,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.setQueuedSchedulingAllowed(true);
-		eg.scheduleForExecution(slotProvider);
+		eg.scheduleForExecution();
 
 		//  we complete another future
 		slotFutures[2].complete(slots[2]);
@@ -419,9 +418,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
 
-		final ExecutionGraph eg = createExecutionGraph(jobGraph);
-		final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
-
 		// set up some available slots and some slot owner that accepts released slots back
 		final List<SimpleSlot> returnedSlots = new ArrayList<>();
 		final SlotOwner recycler = new SlotOwner() {
@@ -432,35 +428,37 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 			}
 		};
 
+		// slot provider that hand out parallelism / 3 slots, then throws an exception
+		final SlotProvider slotProvider = mock(SlotProvider.class);
+
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
-				createSlot(taskManager, jobId, recycler),
-				createSlot(taskManager, jobId, recycler),
-				createSlot(taskManager, jobId, recycler)));
-
-
-		// slot provider that hand out parallelism / 3 slots, then throws an exception
-		final SlotProvider slots = mock(SlotProvider.class);
-		
-		when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-				new Answer<Future<SimpleSlot>>() {
-
-					@Override
-					public Future<SimpleSlot> answer(InvocationOnMock invocation) {
-						if (availableSlots.isEmpty()) {
-							throw new TestRuntimeException();
-						} else {
-							return FlinkCompletableFuture.completed(availableSlots.remove(0));
-						}
+			createSlot(taskManager, jobId, recycler),
+			createSlot(taskManager, jobId, recycler),
+			createSlot(taskManager, jobId, recycler)));
+
+		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
+			new Answer<Future<SimpleSlot>>() {
+
+				@Override
+				public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+					if (availableSlots.isEmpty()) {
+						throw new TestRuntimeException();
+					} else {
+						return FlinkCompletableFuture.completed(availableSlots.remove(0));
 					}
-				});
+				}
+			});
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
+		final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
 
 		// acquire resources and check that all are back after the failure
 
 		final int numSlotsToExpectBack = availableSlots.size();
 
 		try {
-			ejv.allocateResourcesForAll(slots, false);
+			ejv.allocateResourcesForAll(slotProvider, false);
 			fail("should have failed with an exception");
 		}
 		catch (TestRuntimeException e) {
@@ -471,7 +469,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the {@link ExecutionGraph#scheduleForExecution(SlotProvider)} method
+	 * Tests that the {@link ExecutionGraph#scheduleForExecution()} method
 	 * releases partially acquired resources upon exception.
 	 */
 	@Test
@@ -495,8 +493,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
-		final ExecutionGraph eg = createExecutionGraph(jobGraph);
-
 		// set up some available slots and some slot owner that accepts released slots back
 		final List<SimpleSlot> returnedSlots = new ArrayList<>();
 		final SlotOwner recycler = new SlotOwner() {
@@ -509,28 +505,30 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
-				createSlot(taskManager, jobId, recycler),
-				createSlot(taskManager, jobId, recycler),
-				createSlot(taskManager, jobId, recycler),
-				createSlot(taskManager, jobId, recycler),
-				createSlot(taskManager, jobId, recycler)));
+			createSlot(taskManager, jobId, recycler),
+			createSlot(taskManager, jobId, recycler),
+			createSlot(taskManager, jobId, recycler),
+			createSlot(taskManager, jobId, recycler),
+			createSlot(taskManager, jobId, recycler)));
 
 
 		// slot provider that hand out parallelism / 3 slots, then throws an exception
-		final SlotProvider slots = mock(SlotProvider.class);
-
-		when(slots.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-				new Answer<Future<SimpleSlot>>() {
-
-					@Override
-					public Future<SimpleSlot> answer(InvocationOnMock invocation) {
-						if (availableSlots.isEmpty()) {
-							throw new TestRuntimeException();
-						} else {
-							return FlinkCompletableFuture.completed(availableSlots.remove(0));
-						}
+		final SlotProvider slotProvider = mock(SlotProvider.class);
+
+		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
+			new Answer<Future<SimpleSlot>>() {
+
+				@Override
+				public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+					if (availableSlots.isEmpty()) {
+						throw new TestRuntimeException();
+					} else {
+						return FlinkCompletableFuture.completed(availableSlots.remove(0));
 					}
-				});
+				}
+			});
+
+		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
 
 		// acquire resources and check that all are back after the failure
 
@@ -538,7 +536,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		try {
 			eg.setScheduleMode(ScheduleMode.EAGER);
-			eg.scheduleForExecution(slots);
+			eg.scheduleForExecution();
 			fail("should have failed with an exception");
 		}
 		catch (TestRuntimeException e) {
@@ -552,24 +550,25 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
-		return createExecutionGraph(jobGraph, Time.minutes(10));
+	private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider) throws Exception {
+		return createExecutionGraph(jobGraph, slotProvider, Time.minutes(10));
 	}
 
-	private ExecutionGraph createExecutionGraph(JobGraph jobGraph, Time timeout) throws Exception {
+	private ExecutionGraph createExecutionGraph(JobGraph jobGraph, SlotProvider slotProvider, Time timeout) throws Exception {
 		return ExecutionGraphBuilder.buildGraph(
-				null,
-				jobGraph,
-				new Configuration(),
-				executor,
-				executor,
-				getClass().getClassLoader(),
-				new StandaloneCheckpointRecoveryFactory(),
-				timeout,
-				new NoRestartStrategy(),
-				new UnregisteredMetricsGroup(),
-				1,
-				log);
+			null,
+			jobGraph,
+			new Configuration(),
+			executor,
+			executor,
+			slotProvider,
+			getClass().getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			timeout,
+			new NoRestartStrategy(),
+			new UnregisteredMetricsGroup(),
+			1,
+			log);
 	}
 
 	private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index b7850fa..64b9aa2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -139,7 +140,8 @@ public class ExecutionGraphSignalsTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		eg.attachJobGraph(ordered);
 
 		f = eg.getClass().getDeclaredField("state");

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index ef94917..b0137fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -46,6 +46,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
@@ -55,6 +56,7 @@ import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 
 public class ExecutionGraphTestUtils {
 
@@ -183,7 +185,8 @@ public class ExecutionGraphTestUtils {
 			new Configuration(),
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor)));
 
 		ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1,
 				AkkaUtils.getDefaultTimeout()));

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 7427a1d..bd51c81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -59,7 +60,8 @@ public class ExecutionStateProgressTest {
 				new Configuration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			graph.attachJobGraph(Collections.singletonList(ajv));
 
 			setGraphStatus(graph, JobStatus.RUNNING);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 47863ac..cfd4665 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -204,18 +205,19 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		JobGraph testJob = new JobGraph(jobId, "test job", source, target);
 
 		return ExecutionGraphBuilder.buildGraph(
-				null,
-				testJob,
-				new Configuration(),
-				TestingUtils.defaultExecutor(),
-				TestingUtils.defaultExecutor(),
-				getClass().getClassLoader(),
-				new StandaloneCheckpointRecoveryFactory(),
-				Time.of(10, TimeUnit.SECONDS),
-				new FixedDelayRestartStrategy(10, 0L),
-				new UnregisteredMetricsGroup(),
-				1,
-				log);
+			null,
+			testJob,
+			new Configuration(),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			mock(SlotProvider.class),
+			getClass().getClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.of(10, TimeUnit.SECONDS),
+			new FixedDelayRestartStrategy(10, 0L),
+			new UnregisteredMetricsGroup(),
+			1,
+			log);
 	}
 
 	private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
index 32867bf..89db3a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -49,14 +50,15 @@ public class LegacyJobVertexIdTest {
 		jobVertex.setInvokableClass(AbstractInvokable.class);
 
 		ExecutionGraph executionGraph = new ExecutionGraph(
-				mock(ScheduledExecutorService.class),
-				mock(Executor.class),
-				new JobID(),
-				"test",
-				mock(Configuration.class),
-				mock(SerializedValue.class),
-				Time.seconds(1),
-				mock(RestartStrategy.class));
+			mock(ScheduledExecutorService.class),
+			mock(Executor.class),
+			new JobID(),
+			"test",
+			mock(Configuration.class),
+			mock(SerializedValue.class),
+			Time.seconds(1),
+			mock(RestartStrategy.class),
+			mock(SlotProvider.class));
 
 		ExecutionJobVertex executionJobVertex =
 				new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1));
@@ -75,4 +77,4 @@ public class LegacyJobVertexIdTest {
 		Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1));
 		Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2));
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 006f894..5629c0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
@@ -73,7 +74,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -119,7 +121,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -166,7 +169,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -214,7 +218,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -260,7 +265,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -326,7 +332,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}
@@ -383,7 +390,8 @@ public class PointwisePatternTest {
 			cfg,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
-			new NoRestartStrategy());
+			new NoRestartStrategy(),
+			new Scheduler(TestingUtils.defaultExecutionContext()));
 		try {
 			eg.attachJobGraph(ordered);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index afdafe3..d717986 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -187,11 +187,12 @@ public class TerminalStateDeadlockTest {
 				EMPTY_CONFIG,
 				new SerializedValue<>(new ExecutionConfig()),
 				TIMEOUT,
-				new FixedDelayRestartStrategy(1, 0));
+				new FixedDelayRestartStrategy(1, 0),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 		}
 
 		@Override
-		public void scheduleForExecution(SlotProvider slotProvider) {
+		public void scheduleForExecution() {
 			// notify that we are done with the "restarting"
 			synchronized (this) {
 				done = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 4e1bfae..bf17485 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
@@ -87,7 +88,8 @@ public class VertexSlotSharingTest {
 				new Configuration(),
 				new SerializedValue<>(new ExecutionConfig()),
 				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
+				new NoRestartStrategy(),
+				new Scheduler(TestingUtils.defaultExecutionContext()));
 			eg.attachJobGraph(vertices);
 			
 			// verify that the vertices are all in the same slot sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 258e44e..5f4ea25 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -70,13 +70,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
           new Configuration(),
           new SerializedValue(new ExecutionConfig()),
           AkkaUtils.getDefaultTimeout,
-          new NoRestartStrategy())
+          new NoRestartStrategy(),
+          scheduler)
 
         eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources)
 
         eg.getState should equal(JobStatus.CREATED)
 
-        eg.scheduleForExecution(scheduler)
+        eg.scheduleForExecution()
         eg.getState should equal(JobStatus.RUNNING)
 
         instance1.markDead()

http://git-wip-us.apache.org/repos/asf/flink/blob/5c968563/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index 3c81112..43fe169 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -148,6 +149,7 @@ public class RescalePartitionerTest extends TestLogger {
 			new NoRestartStrategy(),
 			new ArrayList<BlobKey>(),
 			new ArrayList<URL>(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
 			ExecutionGraph.class.getClassLoader(),
 			new UnregisteredMetricsGroup());
 		try {