You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/02 12:29:14 UTC

flink git commit: [FLINK-7320] [futures] Replace Flink's futures with Java 8's CompletableFuture in Scheduler

Repository: flink
Updated Branches:
  refs/heads/master 9de270cb4 -> 32bc67e6c


[FLINK-7320] [futures] Replace Flink's futures with Java 8's CompletableFuture in Scheduler

Address PR comments

This closes #4435.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32bc67e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32bc67e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32bc67e6

Branch: refs/heads/master
Commit: 32bc67e6c64d3c0c6b49523f799a8b1fbb2dd80c
Parents: 9de270c
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 18:37:00 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Aug 2 11:18:19 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/StackTraceSampleCoordinator.java |  9 ++--
 .../checkpoint/CheckpointCoordinator.java       |  9 ++--
 .../flink/runtime/concurrent/FutureUtils.java   |  8 +---
 .../flink/runtime/executiongraph/Execution.java |  7 +--
 .../apache/flink/runtime/instance/SlotPool.java |  4 +-
 .../flink/runtime/instance/SlotProvider.java    |  7 +--
 .../runtime/jobmanager/scheduler/Scheduler.java | 19 ++++----
 .../ExecutionGraphMetricsTest.java              |  3 +-
 .../ExecutionGraphSchedulingTest.java           | 49 ++++++++------------
 .../ExecutionVertexSchedulingTest.java          |  9 ++--
 .../executiongraph/ProgrammedSlotProvider.java  | 18 +++----
 .../utils/SimpleSlotProvider.java               | 10 ++--
 .../scheduler/SchedulerIsolatedTasksTest.java   | 18 ++++---
 13 files changed, 74 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index df15b48..26e8a93 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -125,18 +126,14 @@ public class StackTraceSampleCoordinator {
 				executions[i] = execution;
 				triggerIds[i] = execution.getAttemptId();
 			} else {
-				CompletableFuture<StackTraceSample> result = new CompletableFuture();
-				result.completeExceptionally(new IllegalStateException("Task " + tasksToSample[i]
+				return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i]
 					.getTaskNameWithSubtaskIndex() + " is not running."));
-				return result;
 			}
 		}
 
 		synchronized (lock) {
 			if (isShutDown) {
-				CompletableFuture<StackTraceSample> result = new CompletableFuture();
-				result.completeExceptionally(new IllegalStateException("Shut down"));
-				return result;
+				return FutureUtils.completedExceptionally(new IllegalStateException("Shut down"));
 			}
 
 			final int sampleId = sampleIdCounter++;

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5cab7f8..6f41867 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -381,9 +382,7 @@ public class CheckpointCoordinator {
 			result = triggerResult.getPendingCheckpoint().getCompletionFuture();
 		} else {
 			Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
-			result = new CompletableFuture<>();
-			result.completeExceptionally(cause);
-			return result;
+			return FutureUtils.completedExceptionally(cause);
 		}
 
 		// Make sure to remove the created base directory on Exceptions
@@ -439,9 +438,7 @@ public class CheckpointCoordinator {
 					return triggerResult.getPendingCheckpoint().getCompletionFuture();
 				} else {
 					Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message());
-					CompletableFuture<CompletedCheckpoint> failedResult = new CompletableFuture<>();
-					failedResult.completeExceptionally(cause);
-					return failedResult;
+					return FutureUtils.completedExceptionally(cause);
 				}
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 70550ad..cf218c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -60,9 +60,7 @@ public class FutureUtils {
 		try {
 			operationResultFuture = operation.call();
 		} catch (Exception e) {
-			java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
-			exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
-			return exceptionResult;
+			return FutureUtils.completedExceptionally(new RetryException("Could not execute the provided operation.", e));
 		}
 
 		return operationResultFuture.handleAsync(
@@ -71,10 +69,8 @@ public class FutureUtils {
 					if (retries > 0) {
 						return retry(operation, retries - 1, executor);
 					} else {
-						java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
-						exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+						return FutureUtils.<T>completedExceptionally(new RetryException("Could not complete the operation. Number of retries " +
 							"has been exhausted.", throwable));
-						return exceptionResult;
 					}
 				} else {
 					return java.util.concurrent.CompletableFuture.completedFuture(t);

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 66dee0a..5cb12ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -359,7 +359,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					new ScheduledUnit(this, sharingGroup) :
 					new ScheduledUnit(this, sharingGroup, locationConstraint);
 
-			return FutureUtils.toJava(slotProvider.allocateSlot(toSchedule, queued));
+			return slotProvider.allocateSlot(toSchedule, queued);
 		}
 		else {
 			// call race, already deployed, or already done
@@ -688,10 +688,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					maxStrackTraceDepth,
 					timeout));
 		} else {
-			CompletableFuture<StackTraceSampleResponse> result = new CompletableFuture<>();
-			result.completeExceptionally(new Exception("The execution has no slot assigned."));
-
-			return result;
+			return FutureUtils.completedExceptionally(new Exception("The execution has no slot assigned."));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index c74d9a6..9a26779 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -980,11 +980,11 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 		}
 
 		@Override
-		public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+		public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
 			Iterable<TaskManagerLocation> locationPreferences = 
 					task.getTaskToExecute().getVertex().getPreferredLocations();
 
-			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
+			return FutureUtils.toJava(gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index 919f6a1..23e6749 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -18,16 +18,17 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The slot provider is responsible for preparing slots for ready-to-run tasks.
  * 
  * <p>It supports two allocating modes:
  * <ul>
  *     <li>Immediate allocating: A request for a task slot immediately gets satisfied, we can call
- *         {@link Future#getNow(Object)} to get the allocated slot.</li>
+ *         {@link CompletableFuture#getNow(Object)} to get the allocated slot.</li>
  *     <li>Queued allocating: A request for a task slot is queued and returns a future that will be
  *         fulfilled as soon as a slot becomes available.</li>
  * </ul>
@@ -41,5 +42,5 @@ public interface SlotProvider {
 	 * @param allowQueued  Whether allow the task be queued if we do not have enough resource
 	 * @return The future of the allocation
 	 */
-	Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
+	CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index af72d7c..5a7e819 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -37,9 +38,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -134,16 +133,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 
 	@Override
-	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
 		try {
 			final Object ret = scheduleTask(task, allowQueued);
 
 			if (ret instanceof SimpleSlot) {
-				return FlinkCompletableFuture.completed((SimpleSlot) ret);
+				return CompletableFuture.completedFuture((SimpleSlot) ret);
 			}
-			else if (ret instanceof Future) {
+			else if (ret instanceof CompletableFuture) {
 				@SuppressWarnings("unchecked")
-				Future<SimpleSlot> typed = (Future<SimpleSlot>) ret;
+				CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
 				return typed;
 			}
 			else {
@@ -152,12 +151,12 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 			}
 		}
 		catch (NoResourceAvailableException e) {
-			return FlinkCompletableFuture.completedExceptionally(e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
 	/**
-	 * Returns either a {@link SimpleSlot}, or a {@link Future}.
+	 * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}.
 	 */
 	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
 		if (task == null) {
@@ -320,7 +319,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				else {
 					// no resource available now, so queue the request
 					if (queueIfNoResource) {
-						CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+						CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 						this.taskQueue.add(new QueuedTask(task, future));
 						return future;
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 0785a26..bfcab87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -55,6 +55,7 @@ import org.mockito.Matchers;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -113,7 +114,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 			when(simpleSlot.getRoot()).thenReturn(rootSlot);
 			when(simpleSlot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
 
-			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(simpleSlot);
 			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index c2eea5c..d3086a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -52,13 +51,13 @@ import org.junit.After;
 import org.junit.Test;
 
 import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.mockito.verification.Timeout;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -110,8 +109,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
-		final FlinkCompletableFuture<SimpleSlot> sourceFuture = new FlinkCompletableFuture<>();
-		final FlinkCompletableFuture<SimpleSlot> targetFuture = new FlinkCompletableFuture<>();
+		final CompletableFuture<SimpleSlot> sourceFuture = new CompletableFuture<>();
+		final CompletableFuture<SimpleSlot> targetFuture = new CompletableFuture<>();
 
 		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
 		slotProvider.addSlot(sourceVertex.getID(), 0, sourceFuture);
@@ -178,9 +177,9 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
 
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+		final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+		final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
 
 		//
 		//  Create the slots, futures, and the slot provider
@@ -198,8 +197,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 			sourceSlots[i] = createSlot(sourceTaskManagers[i], jobId);
 			targetSlots[i] = createSlot(targetTaskManagers[i], jobId);
 
-			sourceFutures[i] = new FlinkCompletableFuture<>();
-			targetFutures[i] = new FlinkCompletableFuture<>();
+			sourceFutures[i] = new CompletableFuture<>();
+			targetFutures[i] = new CompletableFuture<>();
 		}
 
 		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
@@ -284,16 +283,16 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
 
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final FlinkCompletableFuture<SimpleSlot>[] sourceFutures = new FlinkCompletableFuture[parallelism];
+		final CompletableFuture<SimpleSlot>[] sourceFutures = new CompletableFuture[parallelism];
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final FlinkCompletableFuture<SimpleSlot>[] targetFutures = new FlinkCompletableFuture[parallelism];
+		final CompletableFuture<SimpleSlot>[] targetFutures = new CompletableFuture[parallelism];
 
 		for (int i = 0; i < parallelism; i++) {
 			sourceSlots[i] = createSlot(taskManager, jobId, slotOwner);
 			targetSlots[i] = createSlot(taskManager, jobId, slotOwner);
 
-			sourceFutures[i] = new FlinkCompletableFuture<>();
-			targetFutures[i] = new FlinkCompletableFuture<>();
+			sourceFutures[i] = new CompletableFuture<>();
+			targetFutures[i] = new CompletableFuture<>();
 		}
 
 		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
@@ -359,11 +358,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
 		final SimpleSlot[] slots = new SimpleSlot[parallelism];
 		@SuppressWarnings({"unchecked", "rawtypes"})
-		final FlinkCompletableFuture<SimpleSlot>[] slotFutures = new FlinkCompletableFuture[parallelism];
+		final CompletableFuture<SimpleSlot>[] slotFutures = new CompletableFuture[parallelism];
 
 		for (int i = 0; i < parallelism; i++) {
 			slots[i] = createSlot(taskManager, jobId, slotOwner);
-			slotFutures[i] = new FlinkCompletableFuture<>();
+			slotFutures[i] = new CompletableFuture<>();
 		}
 
 		ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
@@ -393,7 +392,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		//  verify that no deployments have happened
 		verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
 
-		for (Future<SimpleSlot> future : slotFutures) {
+		for (CompletableFuture<SimpleSlot> future : slotFutures) {
 			if (future.isDone()) {
 				assertTrue(future.get().isCanceled());
 			}
@@ -435,17 +434,13 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 			createSlot(taskManager, jobId, recycler)));
 
 		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-			new Answer<Future<SimpleSlot>>() {
-
-				@Override
-				public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+			(InvocationOnMock invocation) -> {
 					if (availableSlots.isEmpty()) {
 						throw new TestRuntimeException();
 					} else {
-						return FlinkCompletableFuture.completed(availableSlots.remove(0));
+						return CompletableFuture.completedFuture(availableSlots.remove(0));
 					}
-				}
-			});
+				});
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
 		final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
@@ -513,17 +508,13 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		final SlotProvider slotProvider = mock(SlotProvider.class);
 
 		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-			new Answer<Future<SimpleSlot>>() {
-
-				@Override
-				public Future<SimpleSlot> answer(InvocationOnMock invocation) {
+			(InvocationOnMock invocation) -> {
 					if (availableSlots.isEmpty()) {
 						throw new TestRuntimeException();
 					} else {
-						return FlinkCompletableFuture.completed(availableSlots.remove(0));
+						return CompletableFuture.completedFuture(availableSlots.remove(0));
 					}
-				}
-			});
+				});
 
 		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 1b029e8..4eac4aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
@@ -33,6 +32,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Matchers;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
 import static org.junit.Assert.assertEquals;
@@ -59,7 +60,7 @@ public class ExecutionVertexSchedulingTest {
 			assertTrue(slot.isReleased());
 
 			Scheduler scheduler = mock(Scheduler.class);
-			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(slot);
 			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
 
@@ -90,7 +91,7 @@ public class ExecutionVertexSchedulingTest {
 			slot.releaseSlot();
 			assertTrue(slot.isReleased());
 
-			final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 
 			Scheduler scheduler = mock(Scheduler.class);
 			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
@@ -125,7 +126,7 @@ public class ExecutionVertexSchedulingTest {
 			final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
 
 			Scheduler scheduler = mock(Scheduler.class);
-			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(slot);
 			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index 3acb2eb..fef6aaa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 class ProgrammedSlotProvider implements SlotProvider {
 
-	private final Map<JobVertexID, Future<SimpleSlot>[]> slotFutures = new HashMap<>();
+	private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>();
 
 	private final int parallelism;
 
@@ -45,15 +45,15 @@ class ProgrammedSlotProvider implements SlotProvider {
 		this.parallelism = parallelism;
 	}
 
-	public void addSlot(JobVertexID vertex, int subtaskIndex, Future<SimpleSlot> future) {
+	public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture<SimpleSlot> future) {
 		checkNotNull(vertex);
 		checkNotNull(future);
 		checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
 
-		Future<SimpleSlot>[] futures = slotFutures.get(vertex);
+		CompletableFuture<SimpleSlot>[] futures = slotFutures.get(vertex);
 		if (futures == null) {
 			@SuppressWarnings("unchecked")
-			Future<SimpleSlot>[] newArray = (Future<SimpleSlot>[]) new Future<?>[parallelism];
+			CompletableFuture<SimpleSlot>[] newArray = (CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
 			futures = newArray;
 			slotFutures.put(vertex, futures);
 		}
@@ -61,7 +61,7 @@ class ProgrammedSlotProvider implements SlotProvider {
 		futures[subtaskIndex] = future;
 	}
 
-	public void addSlots(JobVertexID vertex, Future<SimpleSlot>[] futures) {
+	public void addSlots(JobVertexID vertex, CompletableFuture<SimpleSlot>[] futures) {
 		checkNotNull(vertex);
 		checkNotNull(futures);
 		checkArgument(futures.length == parallelism);
@@ -70,13 +70,13 @@ class ProgrammedSlotProvider implements SlotProvider {
 	}
 
 	@Override
-	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
 		JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
 		int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
 
-		Future<SimpleSlot>[] forTask = slotFutures.get(vertexId);
+		CompletableFuture<SimpleSlot>[] forTask = slotFutures.get(vertexId);
 		if (forTask != null) {
-			Future<SimpleSlot> future = forTask[subtask];
+			CompletableFuture<SimpleSlot> future = forTask[subtask];
 			if (future != null) {
 				return future;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index 2cf1eec..be5282a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -22,8 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -36,6 +35,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.net.InetAddress;
 import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,7 +70,7 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	}
 
 	@Override
-	public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
 		final AllocatedSlot slot;
 
 		synchronized (slots) {
@@ -83,10 +83,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 
 		if (slot != null) {
 			SimpleSlot result = new SimpleSlot(slot, this, 0);
-			return FlinkCompletableFuture.completed(result);
+			return CompletableFuture.completedFuture(result);
 		}
 		else {
-			return FlinkCompletableFuture.completedExceptionally(new NoResourceAvailableException());
+			return FutureUtils.completedExceptionally(new NoResourceAvailableException());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/32bc67e6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 643efae..a05c1a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -31,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -197,7 +196,7 @@ public class SchedulerIsolatedTasksTest {
 			final int totalSlots = scheduler.getNumberOfAvailableSlots();
 
 			// all slots we ever got.
-			List<Future<SimpleSlot>> allAllocatedSlots = new ArrayList<>();
+			List<CompletableFuture<SimpleSlot>> allAllocatedSlots = new ArrayList<>();
 
 			// slots that need to be released
 			final Set<SimpleSlot> toRelease = new HashSet<SimpleSlot>();
@@ -236,16 +235,15 @@ public class SchedulerIsolatedTasksTest {
 			disposeThread.start();
 
 			for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) {
-				Future<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
-				future.thenAcceptAsync(new AcceptFunction<SimpleSlot>() {
-					@Override
-					public void accept(SimpleSlot slot) {
+				CompletableFuture<SimpleSlot> future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true);
+				future.thenAcceptAsync(
+					(SimpleSlot slot) -> {
 						synchronized (toRelease) {
 							toRelease.add(slot);
 							toRelease.notifyAll();
 						}
-					}
-				}, TestingUtils.defaultExecutionContext());
+					},
+					TestingUtils.defaultExecutionContext());
 				allAllocatedSlots.add(future);
 			}
 
@@ -254,7 +252,7 @@ public class SchedulerIsolatedTasksTest {
 			assertFalse("The slot releasing thread caused an error.", errored.get());
 
 			List<SimpleSlot> slotsAfter = new ArrayList<SimpleSlot>();
-			for (Future<SimpleSlot> future : allAllocatedSlots) {
+			for (CompletableFuture<SimpleSlot> future : allAllocatedSlots) {
 				slotsAfter.add(future.get());
 			}