You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 21:28:21 UTC

flink git commit: [FLINK-7317] [futures] Replace Flink's futures with Java 8's CompletableFuture in ExecutionGraph

Repository: flink
Updated Branches:
  refs/heads/master d2a8e3741 -> 7e4694b81


[FLINK-7317] [futures] Replace Flink's futures with Java 8's CompletableFuture in ExecutionGraph

Change FutureUtils.retry to work with CompletableFutures

Let ConjunctFutures extends CompletableFuture

Remove Flink's futures from ExecutionGraph

This closes #4433.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e4694b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e4694b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e4694b8

Branch: refs/heads/master
Commit: 7e4694b8198664300b13b5304c62271b192d4512
Parents: d2a8e37
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 16:33:21 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 23:27:56 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/StackTraceSampleCoordinator.java |  14 +-
 .../StackTraceSampleCoordinatorTest.java        |  10 +-
 .../flink/runtime/concurrent/FutureUtils.java   | 107 ++++++-------
 .../flink/runtime/executiongraph/Execution.java | 152 +++++++++----------
 .../executiongraph/ExecutionAndSlot.java        |   7 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  45 +++---
 .../executiongraph/ExecutionGraphUtils.java     |   6 +-
 .../executiongraph/ExecutionJobVertex.java      |  19 +--
 .../runtime/executiongraph/ExecutionVertex.java |   4 +-
 .../executiongraph/failover/FailoverRegion.java |  14 +-
 .../failover/RestartIndividualStrategy.java     |  14 +-
 .../runtime/blob/BlobServerDeleteTest.java      |  20 +--
 .../flink/runtime/blob/BlobServerGetTest.java   |  20 +--
 .../flink/runtime/blob/BlobServerPutTest.java   |  18 +--
 .../runtime/concurrent/FutureUtilsTest.java     |  61 ++++----
 .../executiongraph/ExecutionGraphUtilsTest.java |  18 +--
 16 files changed, 236 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index 3521f58..df15b48 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -159,13 +158,12 @@ public class StackTraceSampleCoordinator {
 
 			// Trigger all samples
 			for (Execution execution: executions) {
-				final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = FutureUtils.toJava(
-					execution.requestStackTraceSample(
-						sampleId,
-						numSamples,
-						delayBetweenSamples,
-						maxStackTraceDepth,
-						timeout));
+				final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample(
+					sampleId,
+					numSamples,
+					delayBetweenSamples,
+					maxStackTraceDepth,
+					timeout);
 
 				stackTraceSampleFuture.handleAsync(
 					(StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
index 7d8535a..08c4212 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -394,13 +393,16 @@ public class StackTraceSampleCoordinatorTest extends TestLogger {
 			boolean sendSuccess) {
 
 		Execution exec = mock(Execution.class);
+		CompletableFuture<StackTraceSampleResponse> failedFuture = new CompletableFuture<>();
+		failedFuture.completeExceptionally(new Exception("Send failed."));
+
 		when(exec.getAttemptId()).thenReturn(executionId);
 		when(exec.getState()).thenReturn(state);
 		when(exec.requestStackTraceSample(anyInt(), anyInt(), any(Time.class), anyInt(), any(Time.class)))
 			.thenReturn(
 				sendSuccess ?
-					FlinkCompletableFuture.completed(mock(StackTraceSampleResponse.class)) :
-					FlinkCompletableFuture.completedExceptionally(new Exception("Send failed")));
+					CompletableFuture.completedFuture(mock(StackTraceSampleResponse.class)) :
+					failedFuture);
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
@@ -415,7 +417,7 @@ public class StackTraceSampleCoordinatorTest extends TestLogger {
 		ScheduledExecutorService scheduledExecutorService,
 		int timeout) {
 
-		final FlinkCompletableFuture<StackTraceSampleResponse> future = new FlinkCompletableFuture<>();
+		final CompletableFuture<StackTraceSampleResponse> future = new CompletableFuture<>();
 
 		Execution exec = mock(Execution.class);
 		when(exec.getAttemptId()).thenReturn(executionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 8721e52..eb0c07d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -50,42 +50,38 @@ public class FutureUtils {
 	 * @param <T> type of the result
 	 * @return Future containing either the result of the operation or a {@link RetryException}
 	 */
-	public static <T> Future<T> retry(
-		final Callable<Future<T>> operation,
+	public static <T> java.util.concurrent.CompletableFuture<T> retry(
+		final Callable<java.util.concurrent.CompletableFuture<T>> operation,
 		final int retries,
 		final Executor executor) {
 
-		Future<T> operationResultFuture;
+		java.util.concurrent.CompletableFuture<T> operationResultFuture;
 
 		try {
 			operationResultFuture = operation.call();
 		} catch (Exception e) {
-			return FlinkCompletableFuture.completedExceptionally(
-				new RetryException("Could not execute the provided operation.", e));
+			java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
+			exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
+			return exceptionResult;
 		}
 
-		return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>() {
-			@Override
-			public Future<T> apply(T t, Throwable throwable) {
+		return operationResultFuture.handleAsync(
+			(t, throwable) -> {
 				if (throwable != null) {
 					if (retries > 0) {
 						return retry(operation, retries - 1, executor);
 					} else {
-						return FlinkCompletableFuture.completedExceptionally(
-							new RetryException("Could not complete the operation. Number of retries " +
-								"has been exhausted.", throwable));
+						java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
+						exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+							"has been exhausted.", throwable));
+						return exceptionResult;
 					}
 				} else {
-					return FlinkCompletableFuture.completed(t);
+					return java.util.concurrent.CompletableFuture.completedFuture(t);
 				}
-			}
-		}, executor)
-		.thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
-			@Override
-			public Future<T> apply(Future<T> value) {
-				return value;
-			}
-		});
+			},
+			executor)
+		.thenCompose(value -> value);
 	}
 
 	public static class RetryException extends Exception {
@@ -121,17 +117,17 @@ public class FutureUtils {
 	 * @param futures The futures that make up the conjunction. No null entries are allowed.
 	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
 	 */
-	public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends Future<? extends T>> futures) {
+	public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends java.util.concurrent.CompletableFuture<? extends T>> futures) {
 		checkNotNull(futures, "futures");
 
 		final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size());
 
 		if (futures.isEmpty()) {
-			conjunct.complete(Collections.<T>emptyList());
+			conjunct.complete(Collections.emptyList());
 		}
 		else {
-			for (Future<? extends T> future : futures) {
-				future.handle(conjunct.completionHandler);
+			for (java.util.concurrent.CompletableFuture<? extends T> future : futures) {
+				future.whenComplete(conjunct::handleCompletedFuture);
 			}
 		}
 
@@ -149,7 +145,7 @@ public class FutureUtils {
 	 * @param futures The futures to wait on. No null entries are allowed.
 	 * @return The WaitingFuture that completes once all given futures are complete (or one fails).
 	 */
-	public static ConjunctFuture<Void> waitForAll(Collection<? extends Future<?>> futures) {
+	public static ConjunctFuture<Void> waitForAll(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
 		checkNotNull(futures, "futures");
 
 		return new WaitingConjunctFuture(futures);
@@ -164,25 +160,25 @@ public class FutureUtils {
 	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
 	 * many of the Futures are already complete.
 	 */
-	public interface ConjunctFuture<T> extends CompletableFuture<T> {
+	public abstract static class ConjunctFuture<T> extends java.util.concurrent.CompletableFuture<T> {
 
 		/**
 		 * Gets the total number of Futures in the conjunction.
 		 * @return The total number of Futures in the conjunction.
 		 */
-		int getNumFuturesTotal();
+		public abstract int getNumFuturesTotal();
 
 		/**
 		 * Gets the number of Futures in the conjunction that are already complete.
 		 * @return The number of Futures in the conjunction that are already complete
 		 */
-		int getNumFuturesCompleted();
+		public abstract int getNumFuturesCompleted();
 	}
 
 	/**
 	 * The implementation of the {@link ConjunctFuture} which returns its Futures' result as a collection.
 	 */
-	private static class ResultConjunctFuture<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<Collection<T>> {
+	private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
 
 		/** The total number of futures in the conjunction */
 		private final int numTotal;
@@ -199,25 +195,19 @@ public class FutureUtils {
 		/** The function that is attached to all futures in the conjunction. Once a future
 		 * is complete, this function tracks the completion or fails the conjunct.
 		 */
-		final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {
-
-			@Override
-			public Void apply(T o, Throwable throwable) {
-				if (throwable != null) {
-					completeExceptionally(throwable);
-				} else {
-					int index = nextIndex.getAndIncrement();
+		final void handleCompletedFuture(T value, Throwable throwable) {
+			if (throwable != null) {
+				completeExceptionally(throwable);
+			} else {
+				int index = nextIndex.getAndIncrement();
 
-					results[index] = o;
+				results[index] = value;
 
-					if (numCompleted.incrementAndGet() == numTotal) {
-						complete(Arrays.asList(results));
-					}
+				if (numCompleted.incrementAndGet() == numTotal) {
+					complete(Arrays.asList(results));
 				}
-
-				return null;
 			}
-		};
+		}
 
 		@SuppressWarnings("unchecked")
 		ResultConjunctFuture(int numTotal) {
@@ -240,7 +230,7 @@ public class FutureUtils {
 	 * Implementation of the {@link ConjunctFuture} interface which waits only for the completion
 	 * of its futures and does not return their values.
 	 */
-	private static final class WaitingConjunctFuture extends FlinkCompletableFuture<Void> implements ConjunctFuture<Void> {
+	private static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
 
 		/** Number of completed futures */
 		private final AtomicInteger numCompleted = new AtomicInteger(0);
@@ -248,23 +238,18 @@ public class FutureUtils {
 		/** Total number of futures to wait on */
 		private final int numTotal;
 
-		/** Handler which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
-		private final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
-			@Override
-			public Void apply(Object o, Throwable throwable) {
-				if (throwable == null) {
-					if (numTotal == numCompleted.incrementAndGet()) {
-						complete(null);
-					}
-				} else {
-					completeExceptionally(throwable);
+		/** Method which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
+		private void handleCompletedFuture(Object ignored, Throwable throwable) {
+			if (throwable == null) {
+				if (numTotal == numCompleted.incrementAndGet()) {
+					complete(null);
 				}
-
-				return null;
+			} else {
+				completeExceptionally(throwable);
 			}
-		};
+		}
 
-		private WaitingConjunctFuture(Collection<? extends Future<?>> futures) {
+		private WaitingConjunctFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
 			Preconditions.checkNotNull(futures, "Futures must not be null.");
 
 			this.numTotal = futures.size();
@@ -272,8 +257,8 @@ public class FutureUtils {
 			if (futures.isEmpty()) {
 				complete(null);
 			} else {
-				for (Future<?> future : futures) {
-					future.handle(completionHandler);
+				for (java.util.concurrent.CompletableFuture<?> future : futures) {
+					future.whenComplete(this::handleCompletedFuture);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c0f1f39..66dee0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -25,12 +25,7 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -56,7 +51,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
@@ -129,7 +124,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
 
 	/** A future that completes once the Execution reaches a terminal ExecutionState */
-	private final FlinkCompletableFuture<ExecutionState> terminationFuture;
+	private final CompletableFuture<ExecutionState> terminationFuture;
 
 	private volatile ExecutionState state = CREATED;
 
@@ -189,7 +184,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		markTimestamp(ExecutionState.CREATED, startTimestamp);
 
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
-		this.terminationFuture = new FlinkCompletableFuture<>();
+		this.terminationFuture = new CompletableFuture<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -279,7 +274,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 *
 	 * @return A future for the execution's termination
 	 */
-	public Future<ExecutionState> getTerminationFuture() {
+	public CompletableFuture<ExecutionState> getTerminationFuture() {
 		return terminationFuture;
 	}
 
@@ -306,14 +301,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 */
 	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
 		try {
-			final Future<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
+			final CompletableFuture<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
 
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
 			// that we directly deploy the tasks if the slot allocation future is completed. This is
 			// necessary for immediate deployment.
-			final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {
-				@Override
-				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+			final CompletableFuture<Void> deploymentFuture = slotAllocationFuture.handle(
+				(simpleSlot, throwable) ->  {
 					if (simpleSlot != null) {
 						try {
 							deployToSlot(simpleSlot);
@@ -330,7 +324,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					}
 					return null;
 				}
-			});
+			);
 
 			// if tasks have to scheduled immediately check that the task has been deployed
 			if (!queued && !deploymentFuture.isDone()) {
@@ -344,7 +338,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued) 
+	public CompletableFuture<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued)
 			throws IllegalExecutionStateException {
 
 		checkNotNull(slotProvider);
@@ -365,7 +359,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					new ScheduledUnit(this, sharingGroup) :
 					new ScheduledUnit(this, sharingGroup, locationConstraint);
 
-			return slotProvider.allocateSlot(toSchedule, queued);
+			return FutureUtils.toJava(slotProvider.allocateSlot(toSchedule, queued));
 		}
 		else {
 			// call race, already deployed, or already done
@@ -424,24 +418,25 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
-
-			submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-				@Override
-				public Void apply(Throwable failure) {
-					if (failure instanceof TimeoutException) {
-						String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')';
-
-						markFailed(new Exception(
-							"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
-								+ ") not responding after a timeout of " + timeout, failure));
-					}
-					else {
-						markFailed(failure);
+			final CompletableFuture<Acknowledge> submitResultFuture = FutureUtils.toJava(
+				taskManagerGateway.submitTask(deployment, timeout));
+
+			submitResultFuture.whenCompleteAsync(
+				(ack, failure) -> {
+					// only respond to the failure case
+					if (failure != null) {
+						if (failure instanceof TimeoutException) {
+							String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
+
+							markFailed(new Exception(
+								"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+									+ ") not responding after a timeout of " + timeout, failure));
+						} else {
+							markFailed(failure);
+						}
 					}
-					return null;
-				}
-			}, executor);
+				},
+				executor);
 		}
 		catch (Throwable t) {
 			markFailed(t);
@@ -458,24 +453,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			Future<Acknowledge> stopResultFuture = FutureUtils.retry(
-				new Callable<Future<Acknowledge>>() {
-
-					@Override
-					public Future<Acknowledge> call() throws Exception {
-						return taskManagerGateway.stopTask(attemptId, timeout);
-					}
-				},
+			CompletableFuture<Acknowledge> stopResultFuture = FutureUtils.retry(
+				() -> FutureUtils.toJava(taskManagerGateway.stopTask(attemptId, timeout)),
 				NUM_STOP_CALL_TRIES,
 				executor);
 
-			stopResultFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
-				@Override
-				public Void apply(Throwable failure) {
+			stopResultFuture.exceptionally(
+				failure -> {
 					LOG.info("Stopping task was not successful.", failure);
 					return null;
-				}
-			});
+				});
 		}
 	}
 
@@ -575,9 +562,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				// TODO The current approach may send many update messages even though the consuming
 				// task has already been deployed with all necessary information. We have to check
 				// whether this is a problem and fix it, if it is.
-				FlinkFuture.supplyAsync(new Callable<Void>(){
-					@Override
-					public Void call() throws Exception {
+				CompletableFuture.supplyAsync(
+					() -> {
 						try {
 							consumerVertex.scheduleForExecution(
 									consumerVertex.getExecutionGraph().getSlotProvider(),
@@ -588,8 +574,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 						}
 
 						return null;
-					}
-				}, executor);
+					},
+					executor);
 
 				// double check to resolve race conditions
 				if(consumerVertex.getExecutionState() == RUNNING){
@@ -681,7 +667,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param timeout until the request times out
 	 * @return Future stack trace sample response
 	 */
-	public Future<StackTraceSampleResponse> requestStackTraceSample(
+	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
 			int sampleId,
 			int numSamples,
 			Time delayBetweenSamples,
@@ -693,15 +679,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			return taskManagerGateway.requestStackTraceSample(
-				attemptId,
-				sampleId,
-				numSamples,
-				delayBetweenSamples,
-				maxStrackTraceDepth,
-				timeout);
+			return FutureUtils.toJava(
+				taskManagerGateway.requestStackTraceSample(
+					attemptId,
+					sampleId,
+					numSamples,
+					delayBetweenSamples,
+					maxStrackTraceDepth,
+					timeout));
 		} else {
-			return FlinkCompletableFuture.completedExceptionally(new Exception("The execution has no slot assigned."));
+			CompletableFuture<StackTraceSampleResponse> result = new CompletableFuture<>();
+			result.completeExceptionally(new Exception("The execution has no slot assigned."));
+
+			return result;
 		}
 	}
 
@@ -1023,23 +1013,18 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			Future<Acknowledge> cancelResultFuture = FutureUtils.retry(
-				new Callable<Future<Acknowledge>>() {
-					@Override
-					public Future<Acknowledge> call() throws Exception {
-						return taskManagerGateway.cancelTask(attemptId, timeout);
-					}
-				},
+			CompletableFuture<Acknowledge> cancelResultFuture = FutureUtils.retry(
+				() -> FutureUtils.toJava(taskManagerGateway.cancelTask(attemptId, timeout)),
 				NUM_CANCEL_CALL_TRIES,
 				executor);
 
-			cancelResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-				@Override
-				public Void apply(Throwable failure) {
-					fail(new Exception("Task could not be canceled.", failure));
-					return null;
-				}
-			}, executor);
+			cancelResultFuture.whenCompleteAsync(
+				(ack, failure) -> {
+					if (failure != null) {
+						fail(new Exception("Task could not be canceled.", failure));
+					}
+				},
+				executor);
 		}
 	}
 
@@ -1068,16 +1053,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 			final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
 
-			Future<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout);
+			CompletableFuture<Acknowledge> updatePartitionsResultFuture = FutureUtils.toJava(
+				taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout));
 
-			updatePartitionsResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-				@Override
-				public Void apply(Throwable failure) {
-					fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
-						" failed due to:", failure));
-					return null;
-				}
-			}, executor);
+			updatePartitionsResultFuture.whenCompleteAsync(
+				(ack, failure) -> {
+					// fail if there was a failure
+					if (failure != null) {
+						fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
+							" failed due to:", failure));
+					}
+				}, executor);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
index ea6186e..123ff0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -30,9 +31,9 @@ public class ExecutionAndSlot {
 
 	public final Execution executionAttempt;
 
-	public final Future<SimpleSlot> slotFuture;
+	public final CompletableFuture<SimpleSlot> slotFuture;
 
-	public ExecutionAndSlot(Execution executionAttempt, Future<SimpleSlot> slotFuture) {
+	public ExecutionAndSlot(Execution executionAttempt, CompletableFuture<SimpleSlot> slotFuture) {
 		this.executionAttempt = checkNotNull(executionAttempt);
 		this.slotFuture = checkNotNull(slotFuture);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dded029..ae9b5f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -38,14 +38,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -91,6 +86,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -793,7 +790,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			newExecJobVertices.add(ejv);
 		}
 
-		terminationFuture = new FlinkCompletableFuture<>();
+		terminationFuture = new CompletableFuture<>();
 		failoverStrategy.notifyNewVertices(newExecJobVertices);
 	}
 
@@ -852,7 +849,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		try {
 			// collecting all the slots may resize and fail in that operation without slots getting lost
-			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
 			// allocate the slots (obtain all their futures
 			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
@@ -887,10 +884,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}, timeout.getSize(), timeout.getUnit());
 
 
-			allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() {
-
-				@Override
-				public Void apply(Void slots, Throwable throwable) {
+			allAllocationsComplete.handleAsync(
+				(Void slots, Throwable throwable) -> {
 					try {
 						// we do not need the cancellation timeout any more
 						timeoutCancelHandle.cancel(false);
@@ -907,9 +902,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 										slot = execAndSlot.slotFuture.getNow(null);
 										checkNotNull(slot);
 									}
-									catch (ExecutionException | NullPointerException e) {
+									catch (CompletionException | NullPointerException e) {
 										throw new IllegalStateException("SlotFuture is incomplete " +
-												"or erroneous even though all futures completed");
+												"or erroneous even though all futures completed", e);
 									}
 
 									// actual deployment
@@ -938,8 +933,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					// Wouldn't it be nice if we could return an actual Void object?
 					// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
 					return null; 
-				}
-			}, futureExecutor);
+				},
+				futureExecutor);
 
 			// from now on, slots will be rescued by the the futures and their completion, or by the timeout
 			successful = true;
@@ -964,7 +959,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					// make sure no concurrent local actions interfere with the cancellation
 					final long globalVersionForRestart = incrementGlobalModVersion();
 
-					final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+					final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
 					// cancel all tasks (that still need cancelling)
 					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
@@ -973,14 +968,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 					// we build a future that is complete once all vertices have reached a terminal state
 					final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
-					allTerminal.thenAccept(new AcceptFunction<Void>() {
-						@Override
-						public void accept(Void value) {
+					allTerminal.thenAccept(
+						(Void value) -> {
 							// cancellations may currently be overridden by failures which trigger
 							// restarts, so we need to pass a proper restart global version here
 							allVerticesInTerminalState(globalVersionForRestart);
 						}
-					});
+					);
 
 					return;
 				}
@@ -1126,7 +1120,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				final long globalVersionForRestart = incrementGlobalModVersion();
 
 				// we build a future that is complete once all vertices have reached a terminal state
-				final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+				final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
 				// cancel all tasks (that still need cancelling)
 				for (ExecutionJobVertex ejv : verticesInCreationOrder) {
@@ -1134,12 +1128,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 
 				final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
-				allTerminal.thenAccept(new AcceptFunction<Void>() {
-					@Override
-					public void accept(Void value) {
-						allVerticesInTerminalState(globalVersionForRestart);
-					}
-				});
+				allTerminal.thenAccept((Void value) -> allVerticesInTerminalState(globalVersionForRestart));
 
 				return;
 			}
@@ -1250,7 +1239,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	@VisibleForTesting
-	public Future<JobStatus> getTerminationFuture() {
+	public CompletableFuture<JobStatus> getTerminationFuture() {
 		return terminationFuture;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
index cd6d6aa..8558533 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Utilities for dealing with the execution graphs and scheduling.
@@ -40,8 +40,8 @@ public class ExecutionGraphUtils {
 	 * 
 	 * @param slotFuture The future for the slot to release.
 	 */
-	public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) {
-		slotFuture.handle(ReleaseSlotFunction.INSTANCE);
+	public static void releaseSlotFuture(CompletableFuture<SimpleSlot> slotFuture) {
+		slotFuture.handle(ReleaseSlotFunction.INSTANCE::apply);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index f5a592a..5ee7a9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -31,8 +31,6 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -53,10 +51,12 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * An {@code ExecutionJobVertex} is part of the {@link ExecutionGraph}, and the peer 
@@ -475,7 +475,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			try {
 				// allocate the next slot (future)
 				final Execution exec = vertices[i].getCurrentExecutionAttempt();
-				final Future<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
+				final CompletableFuture<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
 				slots[i] = new ExecutionAndSlot(exec, future);
 				successful = true;
 			}
@@ -507,17 +507,14 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * 
 	 * @return A future that is complete once all tasks have canceled.
 	 */
-	public Future<Void> cancelWithFuture() {
+	public CompletableFuture<Void> cancelWithFuture() {
 		// we collect all futures from the task cancellations
-		ArrayList<Future<ExecutionState>> futures = new ArrayList<>(parallelism);
-
-		// cancel each vertex
-		for (ExecutionVertex ev : getTaskVertices()) {
-			futures.add(ev.cancel());
-		}
+		CompletableFuture<ExecutionState>[] futures = Arrays.stream(getTaskVertices())
+			.map(ExecutionVertex::cancel)
+			.<CompletableFuture<ExecutionState>>toArray(CompletableFuture[]::new);
 
 		// return a conjunct future, which is complete once all individual tasks are canceled
-		return FutureUtils.waitForAll(futures);
+		return CompletableFuture.allOf(futures);
 	}
 
 	public void fail(Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e8c1984..0ff71e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -61,6 +60,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
@@ -604,7 +604,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 *  
 	 * @return A future that completes once the execution has reached its final state.
 	 */
-	public Future<ExecutionState> cancel() {
+	public CompletableFuture<ExecutionState> cancel() {
 		// to avoid any case of mixup in the presence of concurrent calls,
 		// we copy a reference to the stack to make sure both calls go to the same Execution 
 		final Execution exec = this.currentExecution;

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 6066c77..1919c61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph.failover;
 
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -37,6 +35,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
@@ -143,7 +142,7 @@ public class FailoverRegion {
 				if (transitionState(curStatus, JobStatus.CANCELLING)) {
 
 					// we build a future that is complete once all vertices have reached a terminal state
-					final ArrayList<Future<?>> futures = new ArrayList<>(connectedExecutionVertexes.size());
+					final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(connectedExecutionVertexes.size());
 
 					// cancel all tasks (that still need cancelling)
 					for (ExecutionVertex vertex : connectedExecutionVertexes) {
@@ -151,12 +150,9 @@ public class FailoverRegion {
 					}
 
 					final FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
-					allTerminal.thenAcceptAsync(new AcceptFunction<Void>() {
-						@Override
-						public void accept(Void value) {
-							allVerticesInTerminalState(globalModVersionOfFailover);
-						}
-					}, executor);
+					allTerminal.thenAcceptAsync(
+						(Void value) -> allVerticesInTerminalState(globalModVersionOfFailover),
+						executor);
 
 					break;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
index 0e7bca5..80f1d2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.executiongraph.failover;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -36,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -107,14 +106,13 @@ public class RestartIndividualStrategy extends FailoverStrategy {
 		// Note: currently all tasks passed here are already in their terminal state,
 		//       so we could actually avoid the future. We use it anyways because it is cheap and
 		//       it helps to support better testing
-		final Future<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
+		final CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
 
 		final ExecutionVertex vertexToRecover = taskExecution.getVertex(); 
 		final long globalModVersion = taskExecution.getGlobalModVersion();
 
-		terminationFuture.thenAcceptAsync(new AcceptFunction<ExecutionState>() {
-			@Override
-			public void accept(ExecutionState value) {
+		terminationFuture.thenAcceptAsync(
+			(ExecutionState value) -> {
 				try {
 					long createTimestamp = System.currentTimeMillis();
 					Execution newExecution = vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion);
@@ -127,8 +125,8 @@ public class RestartIndividualStrategy extends FailoverStrategy {
 					executionGraph.failGlobal(
 							new Exception("Error during fine grained recovery - triggering full recovery", e));
 				}
-			}
-		}, callbackExecutor);
+			},
+			callbackExecutor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 7100e79..22271af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -32,7 +31,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -229,7 +228,7 @@ public class BlobServerDeleteTest extends TestLogger {
 		final int concurrentDeleteOperations = 3;
 		final ExecutorService executor = Executors.newFixedThreadPool(concurrentDeleteOperations);
 
-		final List<Future<Void>> deleteFutures = new ArrayList<>(concurrentDeleteOperations);
+		final List<CompletableFuture<Void>> deleteFutures = new ArrayList<>(concurrentDeleteOperations);
 
 		final byte[] data = {1, 2, 3};
 
@@ -244,21 +243,22 @@ public class BlobServerDeleteTest extends TestLogger {
 			assertTrue(blobServer.getStorageLocation(blobKey).exists());
 
 			for (int i = 0; i < concurrentDeleteOperations; i++) {
-				Future<Void> deleteFuture = FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
-					@Override
-					public Void call() throws Exception {
+				CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync(
+					() -> {
 						try (BlobClient blobClient = blobServer.createClient()) {
 							blobClient.delete(blobKey);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
 						}
 
 						return null;
-					}
-				}, executor);
+					},
+					executor);
 
 				deleteFutures.add(deleteFuture);
 			}
 
-			Future<Void> waitFuture = FutureUtils.waitForAll(deleteFutures);
+			CompletableFuture<Void> waitFuture = FutureUtils.waitForAll(deleteFutures);
 
 			// make sure all delete operation have completed successfully
 			// in case of no lock, one of the delete operations should eventually fail

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 3209648..73827bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -22,9 +22,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -44,7 +43,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -173,7 +172,7 @@ public class BlobServerGetTest extends TestLogger {
 		final BlobStore blobStore = mock(BlobStore.class);
 
 		final int numberConcurrentGetOperations = 3;
-		final List<Future<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+		final List<CompletableFuture<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
 
 		final byte[] data = {1, 2, 3, 4, 99, 42};
 		final ByteArrayInputStream bais = new ByteArrayInputStream(data);
@@ -200,9 +199,8 @@ public class BlobServerGetTest extends TestLogger {
 
 		try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
 			for (int i = 0; i < numberConcurrentGetOperations; i++) {
-				Future<InputStream> getOperation = FlinkCompletableFuture.supplyAsync(new Callable<InputStream>() {
-					@Override
-					public InputStream call() throws Exception {
+				CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync(
+					() -> {
 						try (BlobClient blobClient = blobServer.createClient();
 							 InputStream inputStream = blobClient.get(blobKey)) {
 							byte[] buffer = new byte[data.length];
@@ -210,14 +208,16 @@ public class BlobServerGetTest extends TestLogger {
 							IOUtils.readFully(inputStream, buffer);
 
 							return new ByteArrayInputStream(buffer);
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not read blob for key " + blobKey + '.', e);
 						}
-					}
-				}, executor);
+					},
+					executor);
 
 				getOperations.add(getOperation);
 			}
 
-			Future<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
+			CompletableFuture<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
 
 			Collection<InputStream> inputStreams = inputStreamsFuture.get();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 8b8ddf9..80c6822 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -37,7 +36,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Random;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -314,7 +313,7 @@ public class BlobServerPutTest extends TestLogger {
 		final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
 		final byte[] data = new byte[dataSize];
 
-		ArrayList<Future<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
+		ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
 
 		ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
 
@@ -322,14 +321,15 @@ public class BlobServerPutTest extends TestLogger {
 			final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
 
 			for (int i = 0; i < concurrentPutOperations; i++) {
-				Future<BlobKey> putFuture = FlinkCompletableFuture.supplyAsync(new Callable<BlobKey>() {
-					@Override
-					public BlobKey call() throws Exception {
+				CompletableFuture<BlobKey> putFuture = CompletableFuture.supplyAsync(
+					() -> {
 						try (BlobClient blobClient = blobServer.createClient()) {
 							return blobClient.put(new BlockingInputStream(countDownLatch, data));
+						} catch (IOException e) {
+							throw new FlinkFutureException("Could not upload blob.", e);
 						}
-					}
-				}, executor);
+					},
+					executor);
 
 				allFutures.add(putFuture);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index e262459..cc95e7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
@@ -31,6 +30,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.*;
@@ -58,9 +58,9 @@ public class FutureUtilsTest extends TestLogger{
 
 		try {
 			futureFactory.createFuture(Arrays.asList(
-					new FlinkCompletableFuture<Object>(),
+					new CompletableFuture<>(),
 					null,
-					new FlinkCompletableFuture<Object>()));
+					new CompletableFuture<>()));
 			fail();
 		} catch (NullPointerException ignored) {}
 	}
@@ -68,10 +68,10 @@ public class FutureUtilsTest extends TestLogger{
 	@Test
 	public void testConjunctFutureCompletion() throws Exception {
 		// some futures that we combine
-		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
 
 		// some future is initially completed
 		future2.complete(new Object());
@@ -79,10 +79,7 @@ public class FutureUtilsTest extends TestLogger{
 		// build the conjunct future
 		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 
-		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
-			@Override
-			public void accept(Object value) {}
-		});
+		CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
 
 		assertEquals(4, result.getNumFuturesTotal());
 		assertEquals(1, result.getNumFuturesCompleted());
@@ -116,18 +113,15 @@ public class FutureUtilsTest extends TestLogger{
 	@Test
 	public void testConjunctFutureFailureOnFirst() throws Exception {
 
-		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
 
 		// build the conjunct future
 		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 
-		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
-			@Override
-			public void accept(Object value) {}
-		});
+		CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
 
 		assertEquals(4, result.getNumFuturesTotal());
 		assertEquals(0, result.getNumFuturesCompleted());
@@ -158,19 +152,16 @@ public class FutureUtilsTest extends TestLogger{
 	@Test
 	public void testConjunctFutureFailureOnSuccessive() throws Exception {
 
-		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
-		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
 
 		// build the conjunct future
 		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
 		assertEquals(4, result.getNumFuturesTotal());
 
-		Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
-			@Override
-			public void accept(Object value) {}
-		});
+		java.util.concurrent.CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
 
 		future1.complete(new Object());
 		future3.complete(new Object());
@@ -202,11 +193,11 @@ public class FutureUtilsTest extends TestLogger{
 	 */
 	@Test
 	public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
-		CompletableFuture<Integer> future1 = FlinkCompletableFuture.completed(1);
-		CompletableFuture<Long> future2 = FlinkCompletableFuture.completed(2L);
-		CompletableFuture<Double> future3 = new FlinkCompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Integer> future1 = java.util.concurrent.CompletableFuture.completedFuture(1);
+		java.util.concurrent.CompletableFuture<Long> future2 = java.util.concurrent.CompletableFuture.completedFuture(2L);
+		java.util.concurrent.CompletableFuture<Double> future3 = new java.util.concurrent.CompletableFuture<>();
 
-		ConjunctFuture<Collection<Number>> result = FutureUtils.<Number>combineAll(Arrays.asList(future1, future2, future3));
+		ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
 
 		assertFalse(result.isDone());
 
@@ -219,7 +210,7 @@ public class FutureUtilsTest extends TestLogger{
 
 	@Test
 	public void testConjunctOfNone() throws Exception {
-		final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<Future<Object>>emptyList());
+		final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<java.util.concurrent.CompletableFuture<Object>>emptyList());
 
 		assertEquals(0, result.getNumFuturesTotal());
 		assertEquals(0, result.getNumFuturesCompleted());
@@ -230,13 +221,13 @@ public class FutureUtilsTest extends TestLogger{
 	 * Factory to create {@link ConjunctFuture} for testing.
 	 */
 	private interface FutureFactory {
-		ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures);
+		ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures);
 	}
 
 	private static class ConjunctFutureFactory implements FutureFactory {
 
 		@Override
-		public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) {
+		public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
 			return FutureUtils.combineAll(futures);
 		}
 	}
@@ -244,7 +235,7 @@ public class FutureUtilsTest extends TestLogger{
 	private static class WaitingFutureFactory implements FutureFactory {
 
 		@Override
-		public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) {
+		public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
 			return FutureUtils.waitForAll(futures);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
index 2e6da98..c616501 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
@@ -34,6 +33,7 @@ import org.junit.Test;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Mockito.*;
 
@@ -51,12 +51,12 @@ public class ExecutionGraphUtilsTest {
 		final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
 		final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
 
-		final FlinkCompletableFuture<SimpleSlot> incompleteFuture = new FlinkCompletableFuture<>();
+		final CompletableFuture<SimpleSlot> incompleteFuture = new CompletableFuture<>();
 
-		final FlinkCompletableFuture<SimpleSlot> completeFuture = new FlinkCompletableFuture<>();
+		final CompletableFuture<SimpleSlot> completeFuture = new CompletableFuture<>();
 		completeFuture.complete(slot2);
 
-		final FlinkCompletableFuture<SimpleSlot> disposedSlotFuture = new FlinkCompletableFuture<>();
+		final CompletableFuture<SimpleSlot> disposedSlotFuture = new CompletableFuture<>();
 		slot3.releaseSlot();
 		disposedSlotFuture.complete(slot3);
 
@@ -89,16 +89,16 @@ public class ExecutionGraphUtilsTest {
 
 		ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
 				null,
-				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot1)),
+				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot1)),
 				null,
-				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot2)),
+				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot2)),
 				null
 		};
 
 		ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
-				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot3)),
-				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot4)),
-				new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot5))
+				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot3)),
+				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot4)),
+				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot5))
 		};
 
 		List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2);