You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/22 08:48:48 UTC

[4/4] flink git commit: [FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

[FLINK-5085] Execute CheckpointCoordinator's state discard calls asynchronously

The CheckpointCoordinator is now given an Executor which is used to execute the state discard
calls asynchronously. This will prevent blocking operations to be executed from within the
calling thread.

Shut down ExecutorServices gracefully


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf4b2212
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf4b2212
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf4b2212

Branch: refs/heads/release-1.1
Commit: cf4b221270cff3541bea318f907f9d8207b2fa4d
Parents: f2e4c19
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 17 15:39:11 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 22 09:48:00 2016 +0100

----------------------------------------------------------------------
 .../webmonitor/BackPressureStatsTracker.java    |  2 +-
 .../BackPressureStatsTrackerTest.java           |  5 +-
 .../checkpoint/CheckpointCoordinator.java       | 90 +++++++++++---------
 .../runtime/checkpoint/PendingCheckpoint.java   | 40 +++++++--
 .../savepoint/SavepointCoordinator.java         | 31 ++++---
 .../runtime/executiongraph/ExecutionGraph.java  | 66 ++++++++------
 .../runtime/executiongraph/ExecutionVertex.java |  4 +-
 .../restart/FailureRateRestartStrategy.java     |  2 +-
 .../restart/FixedDelayRestartStrategy.java      |  4 +-
 .../flink/runtime/util/ExecutorUtils.java       | 72 ++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   | 24 ++++--
 .../runtime/minicluster/FlinkMiniCluster.scala  | 12 ++-
 .../testingUtils/TestingJobManager.scala        |  6 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 49 +++++++----
 .../checkpoint/CheckpointStateRestoreTest.java  | 14 +--
 ...ExecutionGraphCheckpointCoordinatorTest.java |  1 +
 .../SavepointCoordinatorRestoreTest.java        | 24 +++---
 .../savepoint/SavepointCoordinatorTest.java     | 24 +++---
 .../executiongraph/AllVerticesIteratorTest.java |  2 +-
 .../ExecutionGraphConstructionTest.java         | 24 ++++--
 .../ExecutionGraphDeploymentTest.java           |  7 +-
 .../ExecutionGraphMetricsTest.java              |  6 +-
 .../ExecutionGraphRestartTest.java              | 17 ++--
 .../ExecutionGraphSignalsTest.java              |  1 +
 .../executiongraph/ExecutionGraphTestUtils.java |  6 +-
 .../ExecutionStateProgressTest.java             |  3 +-
 .../executiongraph/LocalInputSplitsTest.java    |  4 +-
 .../executiongraph/PointwisePatternTest.java    | 21 +++--
 .../TerminalStateDeadlockTest.java              |  1 +
 .../VertexLocationConstraintTest.java           | 20 +++--
 .../executiongraph/VertexSlotSharingTest.java   | 15 ++--
 .../restart/FixedDelayRestartStrategyTest.java  |  6 +-
 .../jobmanager/JobManagerHARecoveryTest.java    | 12 ++-
 .../JobManagerLeaderElectionTest.java           | 31 +++----
 .../flink/runtime/util/TestExecutors.java       | 49 +++++++++++
 .../TaskManagerLossFailsTasksTest.scala         |  1 +
 .../runtime/testingUtils/TestingCluster.scala   |  3 +-
 .../runtime/testingUtils/TestingUtils.scala     |  4 +-
 .../partitioner/RescalePartitionerTest.java     |  1 +
 .../flink/yarn/TestingYarnJobManager.scala      |  9 +-
 .../flink/yarn/YarnApplicationMasterRunner.java | 59 +++++++------
 .../org/apache/flink/yarn/YarnJobManager.scala  |  9 +-
 42 files changed, 527 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index f890106..aae22a1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -165,7 +165,7 @@ public class BackPressureStatsTracker {
 			if (!pendingStats.contains(vertex) &&
 					!vertex.getGraph().getState().isGloballyTerminalState()) {
 
-				ExecutionContext executionContext = vertex.getGraph().getExecutionContext();
+				ExecutionContext executionContext = vertex.getGraph().getFutureExecutionContext();
 
 				// Only trigger if still active job
 				if (executionContext != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
index b2b0afd..2dee200 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
@@ -67,7 +66,7 @@ public class BackPressureStatsTrackerTest {
 		when(graph.getState()).thenReturn(JobStatus.RUNNING);
 
 		// Same Thread execution context
-		when(graph.getExecutionContext()).thenReturn(new ExecutionContext() {
+		when(graph.getFutureExecutionContext()).thenReturn(new ExecutionContext() {
 
 			@Override
 			public void execute(Runnable runnable) {
@@ -76,7 +75,7 @@ public class BackPressureStatsTrackerTest {
 
 			@Override
 			public void reportFailure(Throwable t) {
-				fail();
+				// do nothing
 			}
 
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5c517b2..6e7b4b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -54,6 +54,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -153,6 +154,8 @@ public class CheckpointCoordinator {
 
 	protected final int numberKeyGroups;
 
+	private final Executor executor;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -166,12 +169,13 @@ public class CheckpointCoordinator {
 			ClassLoader userClassLoader,
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
-			RecoveryMode recoveryMode) throws Exception {
+			RecoveryMode recoveryMode,
+			Executor executor) {
 
 		this(job, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE, numberKeyGroups,
 				tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
 				userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(), executor);
 	}
 
 	public CheckpointCoordinator(
@@ -188,7 +192,8 @@ public class CheckpointCoordinator {
 			CheckpointIDCounter checkpointIDCounter,
 			CompletedCheckpointStore completedCheckpointStore,
 			RecoveryMode recoveryMode,
-			CheckpointStatsTracker statsTracker) throws Exception {
+			CheckpointStatsTracker statsTracker,
+			Executor executor) {
 
 		// Sanity check
 		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -249,6 +254,8 @@ public class CheckpointCoordinator {
 		}
 
 		this.numberKeyGroups = numberKeyGroups;
+
+		this.executor = checkNotNull(executor);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -386,7 +393,7 @@ public class CheckpointCoordinator {
 	 * @param nextCheckpointId The checkpoint ID to use for this checkpoint or <code>-1</code> if
 	 *                         the checkpoint ID counter should be queried.
 	 */
-	public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) throws Exception {
+	public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) {
 		// make some eager pre-checks
 		synchronized (lock) {
 			// abort if the coordinator has been shutdown in the meantime
@@ -473,32 +480,32 @@ public class CheckpointCoordinator {
 
 		LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
 
-		final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+		final PendingCheckpoint checkpoint = new PendingCheckpoint(
+			job,
+			checkpointID,
+			timestamp,
+			ackTasks,
+			executor);
 
 		// schedule the timer that will clean up the expired checkpoints
 		TimerTask canceller = new TimerTask() {
 			@Override
 			public void run() {
-				try {
-					synchronized (lock) {
-						// only do the work if the checkpoint is not discarded anyways
-						// note that checkpoint completion discards the pending checkpoint object
-						if (!checkpoint.isDiscarded()) {
-							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+				synchronized (lock) {
+					// only do the work if the checkpoint is not discarded anyways
+					// note that checkpoint completion discards the pending checkpoint object
+					if (!checkpoint.isDiscarded()) {
+						LOG.info("Checkpoint " + checkpointID + " expired before completing.");
 
-							checkpoint.discard(userClassLoader);
-							pendingCheckpoints.remove(checkpointID);
-							rememberRecentCheckpointId(checkpointID);
+						checkpoint.discard(userClassLoader);
+						pendingCheckpoints.remove(checkpointID);
+						rememberRecentCheckpointId(checkpointID);
 
-							onCancelCheckpoint(checkpointID);
+						onCancelCheckpoint(checkpointID);
 
-							triggerQueuedRequests();
-						}
+						triggerQueuedRequests();
 					}
 				}
-				catch (Throwable t) {
-					LOG.error("Exception while handling checkpoint timeout", t);
-				}
 			}
 		};
 
@@ -565,7 +572,7 @@ public class CheckpointCoordinator {
 	 * @return Flag indicating whether the declined checkpoint was associated
 	 * with a pending checkpoint.
 	 */
-	public boolean receiveDeclineMessage(DeclineCheckpoint message) throws Exception {
+	public boolean receiveDeclineMessage(DeclineCheckpoint message) {
 		if (shutdown || message == null) {
 			return false;
 		}
@@ -714,12 +721,7 @@ public class CheckpointCoordinator {
 								"the state handle to avoid lingering state.", message.getCheckpointId(),
 							message.getTaskExecutionId(), message.getJob());
 
-						try {
-							message.getState().deserializeValue(userClassLoader).discardState();
-						} catch (Exception e) {
-							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-						}
+						discardState(message.getState());
 						break;
 					case DISCARDED:
 						LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
@@ -727,12 +729,7 @@ public class CheckpointCoordinator {
 								"state handle tp avoid lingering state.",
 							message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
 
-						try {
-							message.getState().deserializeValue(userClassLoader).discardState();
-						} catch (Exception e) {
-							LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-								message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-						}
+						discardState(message.getState());
 				}
 			}
 			else if (checkpoint != null) {
@@ -751,13 +748,8 @@ public class CheckpointCoordinator {
 					isPendingCheckpoint = false;
 				}
 
-				try {
-					// try to discard the state so that we don't have lingering state lying around
-					message.getState().deserializeValue(userClassLoader).discardState();
-				} catch (Exception e) {
-					LOG.warn("Could not properly discard state for checkpoint {} of task {} of job {}.",
-						message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), e);
-				}
+				// try to discard the state so that we don't have lingering state lying around
+				discardState(message.getState());
 			}
 		}
 
@@ -788,7 +780,7 @@ public class CheckpointCoordinator {
 		recentPendingCheckpoints.addLast(id);
 	}
 
-	private void dropSubsumedCheckpoints(long timestamp) throws Exception {
+	private void dropSubsumedCheckpoints(long timestamp) {
 		Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
 		while (entries.hasNext()) {
 			PendingCheckpoint p = entries.next().getValue();
@@ -809,7 +801,7 @@ public class CheckpointCoordinator {
 	 *
 	 * <p>NOTE: The caller of this method must hold the lock when invoking the method!
 	 */
-	private void triggerQueuedRequests() throws Exception {
+	private void triggerQueuedRequests() {
 		if (triggerRequestQueued) {
 			triggerRequestQueued = false;
 
@@ -1057,4 +1049,18 @@ public class CheckpointCoordinator {
 			}
 		}
 	}
+
+	private void discardState(final SerializedValue<StateHandle<?>> stateObject) {
+
+		executor.execute(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					stateObject.deserializeValue(userClassLoader).discardState();
+				} catch (Exception e) {
+					LOG.warn("Could not properly discard state object.", e);
+				}
+			}
+		});
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index e81bb09..22ba9f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -27,8 +27,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been
@@ -39,6 +44,8 @@ import java.util.Set;
  * state handles always as serialized values, never as actual values.</p>
  */
 public class PendingCheckpoint {
+
+	private static final Logger LOG = LoggerFactory.getLogger(PendingCheckpoint.class);
 		
 	private final Object lock = new Object();
 	
@@ -55,15 +62,20 @@ public class PendingCheckpoint {
 	/** Set of acknowledged tasks */
 	private final Set<ExecutionAttemptID> acknowledgedTasks;
 
+	private final Executor executor;
+
 	private int numAcknowledgedTasks;
 	
 	private boolean discarded;
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public PendingCheckpoint(JobID jobId, long checkpointId, long checkpointTimestamp,
-							Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm)
-	{
+	public PendingCheckpoint(
+			JobID jobId,
+			long checkpointId,
+			long checkpointTimestamp,
+			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
+			Executor executor) {
 		if (jobId == null || verticesToConfirm == null) {
 			throw new NullPointerException();
 		}
@@ -74,6 +86,7 @@ public class PendingCheckpoint {
 		this.jobId = jobId;
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
+		this.executor = Preconditions.checkNotNull(executor);
 		
 		this.notYetAcknowledgedTasks = verticesToConfirm;
 		this.taskStates = new HashMap<>();
@@ -218,19 +231,30 @@ public class PendingCheckpoint {
 	/**
 	 * Discards the pending checkpoint, releasing all held resources.
 	 */
-	public void discard(ClassLoader userClassLoader) throws Exception {
+	public void discard(ClassLoader userClassLoader) {
 		dispose(userClassLoader, true);
 	}
 
-	private void dispose(ClassLoader userClassLoader, boolean releaseState) throws Exception {
+	private void dispose(final ClassLoader userClassLoader, boolean releaseState) {
 		synchronized (lock) {
 			discarded = true;
 			numAcknowledgedTasks = -1;
 			try {
 				if (releaseState) {
-					for (TaskState taskState : taskStates.values()) {
-						taskState.discard(userClassLoader);
-					}
+					executor.execute(new Runnable() {
+						@Override
+						public void run() {
+							try {
+								for (TaskState taskState: taskStates.values()) {
+									taskState.discard(userClassLoader);
+								}
+							} catch (Exception e) {
+								LOG.warn("Could not properly dispose the pending checkpoint " +
+									"{} of job {}.", checkpointId, jobId, e);
+							}
+						}
+					});
+
 				}
 			} finally {
 				taskStates.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
index 2cb8636..a365795 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinator.java
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -96,22 +97,24 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 			ClassLoader userClassLoader,
 			CheckpointIDCounter checkpointIDCounter,
 			SavepointStore savepointStore,
-			CheckpointStatsTracker statsTracker) throws Exception {
+			CheckpointStatsTracker statsTracker,
+			Executor executor) {
 
 		super(jobId,
-				baseInterval,
-				checkpointTimeout,
-				0L,
-				Integer.MAX_VALUE,
-				numberKeyGroups,
-				tasksToTrigger,
-				tasksToWaitFor,
-				tasksToCommitTo,
-				userClassLoader,
-				checkpointIDCounter,
-				IgnoreCheckpointsStore.INSTANCE,
-				RecoveryMode.STANDALONE,
-				statsTracker);
+			baseInterval,
+			checkpointTimeout,
+			0L,
+			Integer.MAX_VALUE,
+			numberKeyGroups,
+			tasksToTrigger,
+			tasksToWaitFor,
+			tasksToCommitTo,
+			userClassLoader,
+			checkpointIDCounter,
+			IgnoreCheckpointsStore.INSTANCE,
+			RecoveryMode.STANDALONE,
+			statsTracker,
+			executor);
 
 		this.savepointStore = checkNotNull(savepointStore);
 		this.savepointPromises = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index de337ab..bfd93e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -59,10 +59,12 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -82,6 +84,7 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -221,8 +224,12 @@ public class ExecutionGraph implements Serializable {
 	private CheckpointStatsTracker checkpointStatsTracker;
 
 	/** The execution context which is used to execute futures. */
-	@SuppressWarnings("NonSerializableFieldInSerializableClass")
-	private ExecutionContext executionContext;
+	private final transient Executor futureExecutor;
+
+	private final transient ExecutionContext futureExecutionContext;
+
+	/** The executor which is used to execute blocking io operations */
+	private final transient Executor ioExecutor;
 
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
@@ -238,7 +245,8 @@ public class ExecutionGraph implements Serializable {
 	 * This constructor is for tests only, because it does not include class loading information.
 	 */
 	ExecutionGraph(
-			ExecutionContext executionContext,
+			Executor futureExecutor,
+			Executor ioExecutor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
@@ -246,7 +254,8 @@ public class ExecutionGraph implements Serializable {
 			FiniteDuration timeout,
 			RestartStrategy restartStrategy) throws IOException {
 		this(
-			executionContext,
+			futureExecutor,
+			ioExecutor,
 			jobId,
 			jobName,
 			jobConfig,
@@ -261,7 +270,8 @@ public class ExecutionGraph implements Serializable {
 	}
 
 	public ExecutionGraph(
-			ExecutionContext executionContext,
+			Executor futureExecutor,
+			Executor ioExecutor,
 			JobID jobId,
 			String jobName,
 			Configuration jobConfig,
@@ -289,6 +299,9 @@ public class ExecutionGraph implements Serializable {
 		// serialize the job information to do the serialisation work only once
 		this.serializedJobInformation = new SerializedValue<>(jobInformation);
 
+		this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
+		this.futureExecutionContext = ExecutionContext$.MODULE$.fromExecutor(futureExecutor);
+		this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
 
 		this.userClassLoader = userClassLoader;
 
@@ -309,8 +322,6 @@ public class ExecutionGraph implements Serializable {
 
 		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
 
-		this.executionContext = checkNotNull(executionContext);
-
 		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
 		try {
 			ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader);
@@ -393,20 +404,21 @@ public class ExecutionGraph implements Serializable {
 		if (interval != Long.MAX_VALUE) {
 			// create the coordinator that triggers and commits checkpoints and holds the state
 			checkpointCoordinator = new CheckpointCoordinator(
-				jobInformation.getJobId(),
-				interval,
-				checkpointTimeout,
-				minPauseBetweenCheckpoints,
-				maxConcurrentCheckpoints,
-				numberKeyGroups,
-				tasksToTrigger,
-				tasksToWaitFor,
-				tasksToCommitTo,
-				userClassLoader,
-				checkpointIDCounter,
-				checkpointStore,
-				recoveryMode,
-				checkpointStatsTracker);
+			jobInformation.getJobId(),
+			interval,
+			checkpointTimeout,
+			minPauseBetweenCheckpoints,
+			maxConcurrentCheckpoints,
+			numberKeyGroups,
+			tasksToTrigger,
+			tasksToWaitFor,
+			tasksToCommitTo,
+			userClassLoader,
+			checkpointIDCounter,
+			checkpointStore,
+			recoveryMode,
+			checkpointStatsTracker,
+			ioExecutor);
 
 			// the periodic checkpoint scheduler is activated and deactivated as a result of
 			// job status changes (running -> on, all other states -> off)
@@ -428,7 +440,8 @@ public class ExecutionGraph implements Serializable {
 			// checkpoint coordinator.
 			checkpointIDCounter,
 			savepointStore,
-			checkpointStatsTracker);
+			checkpointStatsTracker,
+			ioExecutor);
 
 		registerJobStatusListener(savepointCoordinator
 			.createActivatorDeactivator(actorSystem, leaderSessionID));
@@ -623,8 +636,12 @@ public class ExecutionGraph implements Serializable {
 	 *
 	 * @return ExecutionContext associated with this ExecutionGraph
 	 */
-	public ExecutionContext getExecutionContext() {
-		return executionContext;
+	public Executor getFutureExecutor() {
+		return futureExecutor;
+	}
+
+	public ExecutionContext getFutureExecutionContext() {
+		return futureExecutionContext;
 	}
 
 	/**
@@ -1020,7 +1037,6 @@ public class ExecutionGraph implements Serializable {
 		restartStrategy = null;
 		scheduler = null;
 		checkpointCoordinator = null;
-		executionContext = null;
 
 		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
 			vertex.prepareForArchiving();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index c101548..b48fa27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -129,7 +129,7 @@ public class ExecutionVertex implements Serializable {
 		this.priorExecutions = new CopyOnWriteArrayList<Execution>();
 
 		this.currentExecution = new Execution(
-			getExecutionGraph().getExecutionContext(),
+			getExecutionGraph().getFutureExecutionContext(),
 			this,
 			0,
 			createTimestamp,
@@ -430,7 +430,7 @@ public class ExecutionVertex implements Serializable {
 			if (state == FINISHED || state == CANCELED || state == FAILED) {
 				priorExecutions.add(execution);
 				currentExecution = new Execution(
-					getExecutionGraph().getExecutionContext(),
+					getExecutionGraph().getFutureExecutionContext(),
 					this,
 					execution.getAttemptNumber()+1,
 					System.currentTimeMillis(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index 528eacb..c6551c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -71,7 +71,7 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 			restartTimestampsDeque.remove();
 		}
 		restartTimestampsDeque.add(System.currentTimeMillis());
-		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutionContext());
+		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getFutureExecutionContext());
 	}
 
 	private boolean isRestartTimestampsQueueFull() {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 8053e95..69fd3db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -59,7 +59,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	@Override
 	public void restart(final ExecutionGraph executionGraph) {
 		currentRestartAttempt++;
-		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutionContext());
+		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutionContext());
 	}
 
 	/**
@@ -125,4 +125,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 			return new FixedDelayRestartStrategy(maxAttempts, delay);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java
new file mode 100644
index 0000000..d2012a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ExecutorUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ExecutorUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutorUtils.class);
+
+	/**
+	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+	 * they will be shut down hard.
+	 *
+	 * @param timeout to wait for the termination of all ExecutorServices
+	 * @param unit of the timeout
+	 * @param executorServices to shut down
+	 */
+	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+		for (ExecutorService executorService : executorServices) {
+			executorService.shutdown();
+		}
+
+		boolean wasInterrupted = false;
+		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+		long timeLeft = unit.toMillis(timeout);
+		boolean hasTimeLeft = timeLeft > 0L;
+
+		for (ExecutorService executorService : executorServices) {
+			if (wasInterrupted || !hasTimeLeft) {
+				executorService.shutdownNow();
+			} else {
+				try {
+					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+						executorService.shutdownNow();
+					}
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+						"remaining ExecutorServices down now.", e);
+					executorService.shutdownNow();
+
+					wasInterrupted = true;
+				}
+
+				timeLeft = endTime - System.currentTimeMillis();
+				hasTimeLeft = timeLeft > 0L;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ad998de..4a4968f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -114,7 +114,8 @@ import scala.language.postfixOps
  */
 class JobManager(
     protected val flinkConfiguration: Configuration,
-    protected val executor: Executor,
+    protected val futureExecutor: Executor,
+    protected val ioExecutor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
@@ -137,7 +138,7 @@ class JobManager(
 
   /** The extra execution context, for futures, with a custom logging reporter */
   protected val executionContext: ExecutionContext = ExecutionContext.fromExecutor(
-    executor,
+    futureExecutor,
     (t: Throwable) => {
       if (!context.system.isTerminated) {
         log.error("Executor could not execute task", t)
@@ -1106,7 +1107,8 @@ class JobManager(
             graph
           case None =>
             val graph = new ExecutionGraph(
-              executionContext,
+              futureExecutor,
+              ioExecutor,
               jobGraph.getJobID,
               jobGraph.getName,
               jobGraph.getJobConfiguration,
@@ -2018,8 +2020,9 @@ object JobManager {
 
     val ioExecutor = Executors.newFixedThreadPool(
       numberProcessors,
-      new NamedThreadFactory("jobmanager-io-", "-thread-")
-    )
+      new NamedThreadFactory("jobmanager-io-", "-thread-"))
+
+    val timeout = AkkaUtils.getTimeout(configuration)
 
     val (jobManagerSystem, _, _, webMonitorOption, _) = try {
       startActorSystemAndJobManagerActors(
@@ -2035,7 +2038,8 @@ object JobManager {
       )
     } catch {
       case t: Throwable =>
-          futureExecutor.shutdownNow()
+        futureExecutor.shutdownNow()
+        ioExecutor.shutdownNow()
 
         throw t
     }
@@ -2053,8 +2057,11 @@ object JobManager {
         }
     }
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    ExecutorUtils.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   /**
@@ -2670,6 +2677,7 @@ object JobManager {
       jobManagerClass,
       configuration,
       futureExecutor,
+      ioExecutor,
       instanceManager,
       scheduler,
       libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 57f0a83..4136f8f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
 import java.util.UUID
-import java.util.concurrent.Executors
+import java.util.concurrent.{Executors, TimeUnit}
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -35,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.RecoveryMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService, StandaloneLeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
-import org.apache.flink.runtime.util.{NamedThreadFactory, ZooKeeperUtils}
+import org.apache.flink.runtime.util.{ExecutorUtils, NamedThreadFactory, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.slf4j.LoggerFactory
 
@@ -378,8 +379,11 @@ abstract class FlinkMiniCluster(
     jobManagerLeaderRetrievalService.foreach(_.stop())
     isRunning = false
 
-    futureExecutor.shutdownNow()
-    ioExecutor.shutdownNow()
+    ExecutorUtils.gracefulShutdown(
+      timeout.toMillis,
+      TimeUnit.MILLISECONDS,
+      futureExecutor,
+      ioExecutor)
   }
 
   protected def shutdown(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e9db9b0..2023036 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -39,7 +39,8 @@ import java.util.concurrent.Executor
   */
 class TestingJobManager(
     flinkConfiguration: Configuration,
-    executor: Executor,
+    futureExecutor: Executor,
+    ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
@@ -54,7 +55,8 @@ class TestingJobManager(
     metricRegistry : Option[MetricRegistry])
   extends JobManager(
     flinkConfiguration,
-    executor,
+    futureExecutor,
+    ioExecutor,
     instanceManager,
     scheduler,
     libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f05b5d2..35cce85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -94,7 +95,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -145,7 +147,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -194,7 +197,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -244,7 +248,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -342,7 +347,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -461,7 +467,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -610,7 +617,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -744,7 +752,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(10, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -864,7 +873,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// trigger a checkpoint, partially acknowledged
 			assertTrue(coord.triggerCheckpoint(timestamp));
@@ -931,7 +941,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			assertTrue(coord.triggerCheckpoint(timestamp));
 
@@ -992,7 +1003,8 @@ public class CheckpointCoordinatorTest {
 			cl,
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(2, cl),
-			RecoveryMode.STANDALONE);
+			RecoveryMode.STANDALONE,
+			TestExecutors.directExecutor());
 
 		assertTrue(coord.triggerCheckpoint(timestamp));
 
@@ -1121,7 +1133,8 @@ public class CheckpointCoordinatorTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			
 			coord.startCheckpointScheduler();
@@ -1214,7 +1227,8 @@ public class CheckpointCoordinatorTest {
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl),
 				RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1308,7 +1322,8 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
 				(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1379,7 +1394,8 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
 				(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 
 			coord.startCheckpointScheduler();
 
@@ -1459,7 +1475,8 @@ public class CheckpointCoordinatorTest {
 				new ExecutionVertex[] { ackVertex },
 				new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE,
-				new DisabledCheckpointStatsTracker());
+				new DisabledCheckpointStatsTracker(),
+				TestExecutors.directExecutor());
 			
 			coord.startCheckpointScheduler();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 2b1b7e1..e535645 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -91,7 +92,8 @@ public class CheckpointStateRestoreTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -168,7 +170,8 @@ public class CheckpointStateRestoreTest {
 				cl,
 				new StandaloneCheckpointIDCounter(),
 				new StandaloneCompletedCheckpointStore(1, cl),
-				RecoveryMode.STANDALONE);
+				RecoveryMode.STANDALONE,
+				TestExecutors.directExecutor());
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
@@ -206,15 +209,16 @@ public class CheckpointStateRestoreTest {
 	public void testNoCheckpointAvailable() {
 		try {
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-				new JobID(),
+			new JobID(),
 				200000L,
 				200000L,
 				42,
 				new ExecutionVertex[] { mock(ExecutionVertex.class) },
 				new ExecutionVertex[] { mock(ExecutionVertex.class) },
 				new ExecutionVertex[0], cl,
-				new StandaloneCheckpointIDCounter(),
-				new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
+			new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE,
+			TestExecutors.directExecutor());
 
 			try {
 				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index ddbc463..c23aaf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -126,6 +126,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 			CompletedCheckpointStore store) throws Exception {
 		ExecutionGraph executionGraph = new ExecutionGraph(
 				TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 				new JobID(),
 				"test",
 				new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
index cd03403..43d6530 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorRestoreTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -72,17 +73,18 @@ public class SavepointCoordinatorRestoreTest {
 		SavepointStore store = new HeapSavepointStore();
 
 		SavepointCoordinator coord = new SavepointCoordinator(
-				new JobID(),
-				Integer.MAX_VALUE,
-				Integer.MAX_VALUE,
-				0,
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				new ExecutionVertex[] {},
-				getClass().getClassLoader(),
-				new StandaloneCheckpointIDCounter(),
-				store,
-				new DisabledCheckpointStatsTracker());
+			new JobID(),
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			0,
+			new ExecutionVertex[] {},
+			new ExecutionVertex[] {},
+			new ExecutionVertex[] {},
+			getClass().getClassLoader(),
+			new StandaloneCheckpointIDCounter(),
+			store,
+			new DisabledCheckpointStatsTracker(),
+			TestExecutors.directExecutor());
 
 		// --- (2) Checkpoint misses state for a jobVertex (should work) ---
 		Map<JobVertexID, TaskState> checkpointTaskStates = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index dc2b23f..dac4903 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -878,17 +879,18 @@ public class SavepointCoordinatorTest extends TestLogger {
 		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
 
 		return new SavepointCoordinator(
-				jobId,
-				checkpointTimeout,
-				checkpointTimeout,
-				42,
-				triggerVertices,
-				ackVertices,
-				commitVertices,
-				classLoader,
-				checkpointIdCounter,
-				savepointStore,
-				new DisabledCheckpointStatsTracker());
+			jobId,
+			checkpointTimeout,
+			checkpointTimeout,
+			42,
+			triggerVertices,
+			ackVertices,
+			commitVertices,
+			classLoader,
+			checkpointIdCounter,
+			savepointStore,
+			new DisabledCheckpointStatsTracker(),
+			TestExecutors.directExecutor());
 	}
 
 	private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap(

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
index 8b2c21d..a9a9d4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AllVerticesIteratorTest.java
@@ -50,7 +50,7 @@ public class AllVerticesIteratorTest {
 			v4.setParallelism(2);
 			
 			ExecutionGraph eg = Mockito.mock(ExecutionGraph.class);
-			Mockito.when(eg.getExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
+			Mockito.when(eg.getFutureExecutionContext()).thenReturn(TestingUtils.directExecutionContext());
 					
 			ExecutionJobVertex ejv1 = new ExecutionJobVertex(eg, v1, 1,
 					AkkaUtils.getDefaultTimeout());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 6f6fcd0..bf3a17c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -112,7 +112,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -161,7 +162,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -235,7 +237,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -494,7 +497,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -558,7 +562,8 @@ public class ExecutionGraphConstructionTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -626,7 +631,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName, 
 				cfg,
@@ -672,7 +678,8 @@ public class ExecutionGraphConstructionTest {
 			List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName,
 				cfg,
@@ -753,7 +760,8 @@ public class ExecutionGraphConstructionTest {
 			JobGraph jg = new JobGraph(jobId, jobName, v1, v2, v3, v4, v5, v6, v7, v8);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				jobName, 
 				cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index cfbde6a..cc0ffba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -93,7 +93,8 @@ public class ExecutionGraphDeploymentTest {
 			v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
 
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId, 
 				"some job", 
 				new Configuration(),
@@ -317,6 +318,7 @@ public class ExecutionGraphDeploymentTest {
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.directExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			"failing test job",
 			new Configuration(),
@@ -359,7 +361,8 @@ public class ExecutionGraphDeploymentTest {
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.directExecutionContext(), 
+			TestingUtils.directExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			"some job", 
 			new Configuration(), 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 788f8b9..3e6aba5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -46,11 +46,11 @@ import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.Matchers;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future$;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -61,7 +61,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -135,7 +134,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 		TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
 
 		ExecutionGraph executionGraph = new ExecutionGraph(
-			ExecutionContext$.MODULE$.fromExecutor(new ForkJoinPool()),
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobGraph.getJobID(),
 			jobGraph.getName(),
 			jobConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3041ad3..659a912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -228,6 +228,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		// Blocking program
 		ExecutionGraph executionGraph = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"TestJob",
 			new Configuration(),
@@ -539,6 +540,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			new JobID(),
 			"Test job",
 			new Configuration(),
@@ -670,13 +672,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
 		return new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"Test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				restartStrategy);
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
+			new JobID(),
+			"Test job",
+			new Configuration(),
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			restartStrategy);
 	}
 
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index de4a026..fde967e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -133,6 +133,7 @@ public class ExecutionGraphSignalsTest {
 
 		eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId,
 			jobName,
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 903d5f9..89d8a9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.spy;
 
 import java.lang.reflect.Field;
 import java.net.InetAddress;
+import java.util.concurrent.Executor;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
@@ -167,12 +168,13 @@ public class ExecutionGraphTestUtils {
 
 	public static final String ERROR_MESSAGE = "test_failure_error_message";
 
-	public static ExecutionJobVertex getExecutionVertex(JobVertexID id, ExecutionContext executionContext) throws Exception {
+	public static ExecutionJobVertex getExecutionVertex(JobVertexID id, Executor executor) throws Exception {
 		JobVertex ajv = new JobVertex("TestVertex", id);
 		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
 		ExecutionGraph graph = new ExecutionGraph(
-			executionContext, 
+			executor,
+			executor,
 			new JobID(), 
 			"test job", 
 			new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index f3743c1..955c9db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -51,7 +51,8 @@ public class ExecutionStateProgressTest {
 			ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
 
 			ExecutionGraph graph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jid, 
 				"test job", 
 				new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
index f03370c..11dad92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java
@@ -270,7 +270,8 @@ public class LocalInputSplitsTest {
 			JobGraph jobGraph = new JobGraph("test job", vertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(), 
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobGraph.getJobID(),
 				jobGraph.getName(),  
 				jobGraph.getJobConfiguration(),
@@ -336,6 +337,7 @@ public class LocalInputSplitsTest {
 		
 		ExecutionGraph eg = new ExecutionGraph(
 			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobGraph.getJobID(),
 			jobGraph.getName(),  
 			jobGraph.getJobConfiguration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index 7a28b4a..0e147e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -66,7 +66,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -111,7 +112,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -157,7 +159,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -204,7 +207,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName,
 			cfg,
@@ -249,7 +253,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -314,7 +319,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,
@@ -370,7 +376,8 @@ public class PointwisePatternTest {
 		List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
 
 		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(), 
+			TestingUtils.defaultExecutionContext(),
+			TestingUtils.defaultExecutionContext(),
 			jobId, 
 			jobName, 
 			cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index 2a690d9..504822b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -184,6 +184,7 @@ public class TerminalStateDeadlockTest {
 		TestExecGraph(JobID jobId) throws IOException {
 			super(
 				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
 				jobId,
 				"test graph",
 				EMPTY_CONFIG,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index d145dd3..b160561 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -81,13 +81,14 @@ public class VertexLocationConstraintTest {
 			JobGraph jg = new JobGraph("test job", jobVertex);
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					jg.getJobID(),
-					jg.getName(),
-					jg.getJobConfiguration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					timeout,
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				jg.getJobID(),
+				jg.getName(),
+				jg.getJobConfiguration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				timeout,
+				new NoRestartStrategy());
 			eg.attachJobGraph(Collections.singletonList(jobVertex));
 			
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
@@ -155,6 +156,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -232,6 +234,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -300,6 +303,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -370,6 +374,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),
@@ -411,6 +416,7 @@ public class VertexLocationConstraintTest {
 			
 			ExecutionGraph eg = new ExecutionGraph(
 					TestingUtils.defaultExecutionContext(),
+					TestingUtils.defaultExecutionContext(),
 					jg.getJobID(),
 					jg.getName(),
 					jg.getJobConfiguration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 0c95695..27708a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -80,13 +80,14 @@ public class VertexSlotSharingTest {
 			List<JobVertex> vertices = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
 			
 			ExecutionGraph eg = new ExecutionGraph(
-					TestingUtils.defaultExecutionContext(),
-					new JobID(),
-					"test job",
-					new Configuration(),
-					new SerializedValue<>(new ExecutionConfig()),
-					AkkaUtils.getDefaultTimeout(),
-					new NoRestartStrategy());
+				TestingUtils.defaultExecutionContext(),
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				new NoRestartStrategy());
 			eg.attachJobGraph(vertices);
 			
 			// verify that the vertices are all in the same slot sharing group

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
index d19060a..ceb5da4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.executiongraph.restart;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.util.TestExecutors;
 import org.junit.Test;
 import org.mockito.Mockito;
 import scala.concurrent.ExecutionContext$;
@@ -39,8 +39,8 @@ public class FixedDelayRestartStrategyTest {
 			restartDelay);
 
 		ExecutionGraph executionGraph = mock(ExecutionGraph.class);
-		when(executionGraph.getExecutionContext())
-			.thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor()));
+		when(executionGraph.getFutureExecutionContext())
+			.thenReturn(ExecutionContext$.MODULE$.fromExecutor(TestExecutors.directExecutor()));
 
 		while(fixedDelayRestartStrategy.canRestart()) {
 			fixedDelayRestartStrategy.restart(executionGraph);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 2e3bace..b98f338 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -82,6 +82,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
@@ -127,6 +128,8 @@ public class JobManagerHARecoveryTest {
 		flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.newFolder().toString());
 		flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots);
 
+		ExecutorService executor = null;
+
 		try {
 			Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 
@@ -144,10 +147,13 @@ public class JobManagerHARecoveryTest {
 				MemoryArchivist.class,
 				10), "archive");
 
+			executor = new ForkJoinPool();
+
 			Props jobManagerProps = Props.create(
 				TestingJobManager.class,
 				flinkConfiguration,
-				new ForkJoinPool(),
+				executor,
+				executor,
 				instanceManager,
 				scheduler,
 				new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
@@ -274,6 +280,10 @@ public class JobManagerHARecoveryTest {
 			if (taskManager != null) {
 				taskManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
+
+			if (executor != null) {
+				executor.shutdownNow();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index bdd019d..8f38b5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -186,21 +186,22 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 		SavepointStore savepointStore = SavepointStoreFactory.createFromConfig(configuration);
 
 		return Props.create(
-				TestingJobManager.class,
-				configuration,
-				executor,
-				new InstanceManager(),
-				new Scheduler(TestingUtils.defaultExecutionContext()),
-				new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
-				ActorRef.noSender(),
-				new NoRestartStrategy.NoRestartStrategyFactory(),
-				AkkaUtils.getDefaultTimeout(),
-				leaderElectionService,
-				submittedJobGraphStore,
-				checkpointRecoveryFactory,
-				savepointStore,
-				AkkaUtils.getDefaultTimeout(),
-				Option.apply(null)
+			TestingJobManager.class,
+			configuration,
+			executor,
+			executor,
+			new InstanceManager(),
+			new Scheduler(TestingUtils.defaultExecutionContext()),
+			new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
+			ActorRef.noSender(),
+			new NoRestartStrategy.NoRestartStrategyFactory(),
+			AkkaUtils.getDefaultTimeout(),
+			leaderElectionService,
+			submittedJobGraphStore,
+			checkpointRecoveryFactory,
+			savepointStore,
+			AkkaUtils.getDefaultTimeout(),
+			Option.apply(null)
 		);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
index 703593c..d2cf9e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/TestExecutors.java
@@ -18,10 +18,17 @@
 
 package org.apache.flink.runtime.util;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 public class TestExecutors {
 
+	private static final Logger LOG = LoggerFactory.getLogger(TestExecutors.class);
+
 	public static Executor directExecutor() {
 		return DirectExecutor.INSTANCE;
 	}
@@ -37,4 +44,46 @@ public class TestExecutors {
 			command.run();
 		}
 	}
+
+	/**
+	 * Gracefully shutdown the given {@link ExecutorService}. The call waits the given timeout that
+	 * all ExecutorServices terminate. If the ExecutorServices do not terminate in this time,
+	 * they will be shut down hard.
+	 *
+	 * @param timeout to wait for the termination of all ExecutorServices
+	 * @param unit of the timeout
+	 * @param executorServices to shut down
+	 */
+	public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+		for (ExecutorService executorService: executorServices) {
+			executorService.shutdown();
+		}
+
+		boolean wasInterrupted = false;
+		final long endTime = unit.toMillis(timeout) + System.currentTimeMillis();
+		long timeLeft = unit.toMillis(timeout);
+		boolean hasTimeLeft = timeLeft > 0L;
+
+		for (ExecutorService executorService: executorServices) {
+			if (wasInterrupted || !hasTimeLeft) {
+				executorService.shutdownNow();
+			} else {
+				try {
+					if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) {
+						LOG.warn("ExecutorService did not terminate in time. Shutting it down now.");
+						executorService.shutdownNow();
+					}
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted while shutting down executor services. Shutting all " +
+						"remaining ExecutorServices down now.", e);
+					executorService.shutdownNow();
+
+					wasInterrupted = true;
+				}
+
+				timeLeft = endTime - System.currentTimeMillis();
+				hasTimeLeft = timeLeft > 0L;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index c30d244..7c77ce7 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -55,6 +55,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers {
 
         val eg = new ExecutionGraph(
           TestingUtils.defaultExecutionContext,
+          TestingUtils.defaultExecutionContext,
           new JobID(),
           "test job",
           new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index c7c141a..f16dc1c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -119,7 +119,8 @@ class TestingCluster(
     val jobManagerProps = Props(
       new TestingJobManager(
         configuration,
-        futureExecutor,
+      futureExecutor,
+      ioExecutor,
         instanceManager,
         scheduler,
         libraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4b2212/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index fadef28..f400685 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegistere
 import org.apache.flink.runtime.taskmanager.TaskManager
 
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor}
 import scala.language.postfixOps
 
 /**
@@ -112,7 +112,7 @@ object TestingUtils {
     * @param actionQueue
     */
   class QueuedActionExecutionContext private[testingUtils] (val actionQueue: ActionQueue)
-    extends ExecutionContext {
+    extends ExecutionContextExecutor {
 
     var automaticExecution = false