You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/03 11:49:04 UTC

[2/3] flink git commit: [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d4afdbd..8084154 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link TaskExecutor} RPC gateway interface
@@ -48,7 +48,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param resourceManagerLeaderId current leader id of the ResourceManager
 	 * @return answer to the slot request
 	 */
-	Future<Acknowledge> requestSlot(
+	CompletableFuture<Acknowledge> requestSlot(
 		SlotID slotId,
 		JobID jobId,
 		AllocationID allocationId,
@@ -64,7 +64,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout of the submit operation
 	 * @return Future acknowledge of the successful operation
 	 */
-	Future<Acknowledge> submitTask(
+	CompletableFuture<Acknowledge> submitTask(
 		TaskDeploymentDescriptor tdd,
 		UUID leaderId,
 		@RpcTimeout Time timeout);
@@ -77,7 +77,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout for the update partitions operation
 	 * @return Future acknowledge if the partitions have been successfully updated
 	 */
-	Future<Acknowledge> updatePartitions(
+	CompletableFuture<Acknowledge> updatePartitions(
 		ExecutionAttemptID executionAttemptID,
 		Iterable<PartitionInfo> partitionInfos,
 		@RpcTimeout Time timeout);
@@ -99,7 +99,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param checkpointOptions for performing the checkpoint
 	 * @return Future acknowledge if the checkpoint has been successfully triggered
 	 */
-	Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
+	CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
 
 	/**
 	 * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
@@ -110,7 +110,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
 	 * @return Future acknowledge if the checkpoint has been successfully confirmed
 	 */
-	Future<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
+	CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
 
 	/**
 	 * Stop the given task.
@@ -119,7 +119,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout for the stop operation
 	 * @return Future acknowledge if the task is successfully stopped
 	 */
-	Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+	CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 
 	/**
 	 * Cancel the given task.
@@ -128,7 +128,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout for the cancel operation
 	 * @return Future acknowledge if the task is successfully canceled
 	 */
-	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+	CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 
 	/**
 	 * Heartbeat request from the job manager

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 4f91166..4084d67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -156,7 +155,7 @@ public class TaskExecutorToResourceManagerConnection
 				ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
 
 			Time timeout = Time.milliseconds(timeoutMillis);
-			return FutureUtils.toJava(resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout));
+			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 78b49ef..b077b76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -52,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -162,7 +162,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	}
 
 	// export the termination future for caller to know it is terminated
-	public Future<Void> getTerminationFuture() {
+	public CompletableFuture<Void> getTerminationFuture() {
 		return taskManager.getTerminationFuture();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 3b9da48..a919c78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -32,6 +31,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 public class RpcInputSplitProvider implements InputSplitProvider {
 	private final UUID jobMasterLeaderId;
@@ -61,7 +61,7 @@ public class RpcInputSplitProvider implements InputSplitProvider {
 	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
 		Preconditions.checkNotNull(userCodeClassLoader);
 
-		Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
+		CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
 				jobMasterLeaderId, jobVertexID, executionAttemptID);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
index 07d04e6..26e1b0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskexecutor.rpc;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -46,6 +45,6 @@ public class RpcPartitionStateChecker implements PartitionProducerStateChecker {
 			IntermediateDataSetID resultId,
 			ResultPartitionID partitionId) {
 
-		return FutureUtils.toJava(jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId));
+		return jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
index cf01d5a..d898562 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -32,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 public class RpcResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
@@ -55,18 +54,17 @@ public class RpcResultPartitionConsumableNotifier implements ResultPartitionCons
 	}
 	@Override
 	public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
-		Future<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
+		CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
 				jobMasterLeaderId, partitionId, timeout);
 
-		acknowledgeFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-			@Override
-			public Void apply(Throwable value) {
-				LOG.error("Could not schedule or update consumers at the JobManager.", value);
+		acknowledgeFuture.whenCompleteAsync(
+			(Acknowledge ack, Throwable throwable) -> {
+				if (throwable != null) {
+					LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
 
-				taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", value));
-
-				return null;
-			}
-		}, executor);
+					taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a6712ad..ef4fa86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
 import java.net._
 import java.util.UUID
 import java.util.concurrent.{Future => JavaFuture, _}
-import java.util.function.BiFunction
+import java.util.function.{BiFunction, Consumer}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -1105,17 +1105,18 @@ class JobManager(
 
       val originalSender = new AkkaActorGateway(sender(), leaderSessionID.orNull)
 
-      val sendingFuture = stackTraceFuture.thenAccept(new AcceptFunction[StackTrace] {
-        override def accept(value: StackTrace): Unit = {
-          originalSender.tell(value)
-        }
-      })
+      val sendingFuture = stackTraceFuture.thenAccept(
+        new Consumer[StackTrace]() {
+          override def accept(value: StackTrace): Unit = {
+            originalSender.tell(value)
+          }
+        })
 
-      sendingFuture.exceptionally(new ApplyFunction[Throwable, Void] {
+      sendingFuture.exceptionally(new java.util.function.Function[Throwable, Void] {
         override def apply(value: Throwable): Void = {
           log.info("Could not send requested stack trace.", value)
 
-          return null
+          null
         }
       })
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 09e829e..37a4547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.akka;
 
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -43,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -125,7 +123,7 @@ public class QuarantineMonitorTest extends TestLogger {
 			// start watching the watchee
 			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
 
-			Future<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
+			CompletableFuture<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
 
 			Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
 		} finally {
@@ -166,7 +164,7 @@ public class QuarantineMonitorTest extends TestLogger {
 			// start watching the watchee
 			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
 
-			Future<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
+			CompletableFuture<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
 
 			Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
 		} finally {
@@ -182,8 +180,8 @@ public class QuarantineMonitorTest extends TestLogger {
 		private final CompletableFuture<String> hasQuarantinedFuture;
 
 		public TestingQuarantineHandler() {
-			this.wasQuarantinedByFuture = new FlinkCompletableFuture<>();
-			this.hasQuarantinedFuture = new FlinkCompletableFuture<>();
+			this.wasQuarantinedByFuture = new CompletableFuture<>();
+			this.hasQuarantinedFuture = new CompletableFuture<>();
 		}
 
 		@Override
@@ -196,11 +194,11 @@ public class QuarantineMonitorTest extends TestLogger {
 			hasQuarantinedFuture.complete(remoteSystem);
 		}
 
-		public Future<String> getWasQuarantinedByFuture() {
+		public CompletableFuture<String> getWasQuarantinedByFuture() {
 			return wasQuarantinedByFuture;
 		}
 
-		public Future<String> getHasQuarantinedFuture() {
+		public CompletableFuture<String> getHasQuarantinedFuture() {
 			return hasQuarantinedFuture;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 5df5c58..e23f6a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -339,10 +338,10 @@ public class CheckpointCoordinatorMasterHooksTest {
 		final MasterTriggerRestoreHook<Void> hook = mockGeneric(MasterTriggerRestoreHook.class);
 		when(hook.getIdentifier()).thenReturn(id);
 		when(hook.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class))).thenAnswer(
-				new Answer<Future<Void>>() {
+				new Answer<CompletableFuture<Void>>() {
 
 					@Override
-					public Future<Void> answer(InvocationOnMock invocation) throws Throwable {
+					public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
 						assertEquals(1, cc.getNumberOfPendingCheckpoints());
 
 						long checkpointId = (Long) invocation.getArguments()[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index f1bc43b..e1144c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
 import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
 import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -71,6 +70,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -524,7 +524,7 @@ public class ResourceManagerTest extends TestLogger {
 
 			final SlotReport slotReport = new SlotReport();
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
+			CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
 				rmLeaderSessionId,
 				taskManagerAddress,
 				taskManagerResourceID,
@@ -622,7 +622,7 @@ public class ResourceManagerTest extends TestLogger {
 			rmLeaderElectionService.isLeader(rmLeaderId);
 
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+			CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
 				rmLeaderId,
 				jmLeaderId,
 				jmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a7a86c3..267f10b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
@@ -42,6 +41,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -86,7 +87,7 @@ public class DispatcherTest extends TestLogger {
 
 			DispatcherGateway dispatcherGateway = dispatcher.getSelf();
 
-			Future<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
 
 			acknowledgeFuture.get();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index bfcab87..f4e8b30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -120,7 +119,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			when(rootSlot.getSlotNumber()).thenReturn(0);
 
-			when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 			TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index d3086a8..b88a928 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -576,7 +575,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 	private static TaskManagerGateway createTaskManager() {
 		TaskManagerGateway tm = mock(TaskManagerGateway.class);
 		when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		return tm;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index effe417..de9081b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -37,6 +36,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.junit.Assert.assertEquals;
 
 import static org.junit.Assert.fail;
@@ -155,9 +156,9 @@ public class ExecutionGraphStopTest extends TestLogger {
 
 		final TaskManagerGateway gateway = mock(TaskManagerGateway.class);
 		when(gateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 		when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
index f453d20..be68532 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -19,14 +19,15 @@
 package org.apache.flink.runtime.executiongraph.utils;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 
+import java.util.concurrent.CompletableFuture;
+
 public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
 
 	@Override
-	public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		return new FlinkCompletableFuture<>();
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return new CompletableFuture<>();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 6e67c1a..b968d39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A TaskManagerGateway that simply acks the basic operations (deploy, cancel, update) and does not
@@ -56,39 +56,39 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 	public void stopCluster(ApplicationStatus applicationStatus, String message) {}
 
 	@Override
-	public Future<StackTrace> requestStackTrace(Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
 	@Override
-	public Future<StackTraceSampleResponse> requestStackTraceSample(
+	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
 			ExecutionAttemptID executionAttemptID,
 			int sampleId,
 			int numSamples,
 			Time delayBetweenSamples,
 			int maxStackTraceDepth,
 			Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
 	@Override
-	public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
-	public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
-	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
-	public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
@@ -110,12 +110,12 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 			CheckpointOptions checkpointOptions) {}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerLog(Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index b444640..4cc4f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -35,6 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -82,7 +82,7 @@ public class SlotPoolRpcTest {
 		);
 		pool.start(UUID.randomUUID(), "foobar");
 
-		Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
 
 		try {
 			future.get(4, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index cf95461..3e2293b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -41,6 +40,7 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
@@ -85,7 +85,7 @@ public class SlotPoolTest extends TestLogger {
 		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
 		when(resourceManagerGateway
 			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
-			.thenReturn(mock(Future.class, RETURNS_MOCKS));
+			.thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 		slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway);
 	}
@@ -101,7 +101,7 @@ public class SlotPoolTest extends TestLogger {
 		slotPool.registerTaskManager(resourceID);
 
 		ScheduledUnit task = mock(ScheduledUnit.class);
-		Future<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
 		assertFalse(future.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -126,8 +126,8 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
-		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		assertFalse(future1.isDone());
 		assertFalse(future2.isDone());
@@ -165,7 +165,7 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 		assertFalse(future1.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -182,7 +182,7 @@ public class SlotPoolTest extends TestLogger {
 		// return this slot to pool
 		slot1.releaseSlot();
 
-		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		// second allocation fulfilled by previous slot returning
 		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -200,7 +200,7 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 		assertFalse(future.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -241,14 +241,14 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
 		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
 		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 		assertTrue(slotPool.offerSlot(allocatedSlot));

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 0b25e6c..48a1d45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -34,7 +33,6 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -50,6 +48,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -177,7 +176,7 @@ public class JobMasterTest extends TestLogger {
 			anyString(),
 			any(JobID.class),
 			any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JobMasterRegistrationSuccess(
+		)).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess(
 			heartbeatInterval, rmLeaderId, rmResourceId)));
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 9a4917a..da992bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
@@ -123,8 +122,8 @@ public class RetryingRegistrationTest extends TestLogger {
 			// RPC service that fails upon the first connection, but succeeds on the second
 			RpcService rpc = mock(RpcService.class);
 			when(rpc.connect(anyString(), any(Class.class))).thenReturn(
-					FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")),  // first connection attempt fails
-					FlinkCompletableFuture.completed(testGateway)                         // second connection attempt succeeds
+					FutureUtils.completedExceptionally(new Exception("test connect failure")),  // first connection attempt fails
+					CompletableFuture.completedFuture(testGateway)                         // second connection attempt succeeds
 			);
 			when(rpc.getExecutor()).thenReturn(executor);
 
@@ -245,8 +244,8 @@ public class RetryingRegistrationTest extends TestLogger {
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
 
 			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
-					FlinkCompletableFuture.<RegistrationResponse>completedExceptionally(new Exception("test exception")),
-					FlinkCompletableFuture.<RegistrationResponse>completed(new TestRegistrationSuccess(testId)));
+					FutureUtils.completedExceptionally(new Exception("test exception")),
+					CompletableFuture.completedFuture(new TestRegistrationSuccess(testId)));
 
 			rpc.registerGateway(testEndpointAddress, testGateway);
 
@@ -281,7 +280,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		TestingRpcService rpc = new TestingRpcService();
 
 		try {
-			FlinkCompletableFuture<RegistrationResponse> result = new FlinkCompletableFuture<>();
+			CompletableFuture<RegistrationResponse> result = new CompletableFuture<>();
 
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
 			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result);
@@ -340,7 +339,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		@Override
 		protected CompletableFuture<RegistrationResponse> invokeRegistration(
 				TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
-			return FutureUtils.toJava(gateway.registrationCall(leaderId, timeoutMillis));
+			return gateway.registrationCall(leaderId, timeoutMillis);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 1b23fa3..4cfbc12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.registration;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.TestingGatewayBase;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -47,7 +46,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
 
 	// ------------------------------------------------------------------------
 
-	public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
+	public CompletableFuture<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
 		invocations.add(new RegistrationCall(leaderId, timeout));
 
 		RegistrationResponse response = responses[pos];
@@ -56,7 +55,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
 		}
 
 		// return a completed future (for a proper value), or one that never completes and will time out (for null)
-		return response != null ? FlinkCompletableFuture.completed(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+		return response != null ? CompletableFuture.completedFuture(response) : futureWithTimeout(timeout);
 	}
 
 	public BlockingQueue<RegistrationCall> getInvocations() {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index 4d5964e..7b8703e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -34,6 +33,7 @@ import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -83,7 +83,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
 		jobLeaderIdService.addJob(jobId);
 
-		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+		CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
 
 		// notify the leader id service about the new leader
 		leaderRetrievalService.notifyListener(address, leaderId);
@@ -117,7 +117,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
 		jobLeaderIdService.addJob(jobId);
 
-		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+		CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
 
 		// remove the job before we could find a leader
 		jobLeaderIdService.removeJob(jobId);
@@ -228,7 +228,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
 		jobLeaderIdService.addJob(jobId);
 
-		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+		CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
 
 		// notify the leader id service about the new leader
 		leaderRetrievalService.notifyListener(address, leaderId);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 4836f74..6480d75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -43,6 +42,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertTrue;
@@ -74,11 +74,11 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test response successful
-		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderID,
 			jmResourceId,
@@ -104,12 +104,12 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
 			differentLeaderSessionID,
 			jmLeaderID,
 			jmResourceId,
@@ -134,14 +134,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			"localhost",
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			differentLeaderSessionID,
 			jmResourceId,
@@ -166,14 +166,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			"localhost",
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes invalid address
 		String invalidAddress = "/jobMasterAddress2";
-		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderSessionId,
 			jmResourceId,
@@ -198,14 +198,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			"localhost",
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		JobID unknownJobIDToHAServices = new JobID();
 		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-		Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderSessionId,
 			jmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 85b7eb4..4127cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -41,6 +40,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertNotEquals;
@@ -89,13 +89,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	public void testRegisterTaskExecutor() throws Exception {
 		try {
 			// test response successful
-			Future<RegistrationResponse> successfulFuture =
+			CompletableFuture<RegistrationResponse> successfulFuture =
 				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
 			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
-			Future<RegistrationResponse> duplicateFuture =
+			CompletableFuture<RegistrationResponse> duplicateFuture =
 				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
@@ -115,7 +115,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			UUID differentLeaderSessionID = UUID.randomUUID();
-			Future<RegistrationResponse> unMatchedLeaderFuture =
+			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
 				resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
 			assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
@@ -133,7 +133,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
-			Future<RegistrationResponse> invalidAddressFuture =
+			CompletableFuture<RegistrationResponse> invalidAddressFuture =
 				resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
 			assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 		} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 39c5f25..93e96a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
-import org.apache.flink.runtime.concurrent.*;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -44,7 +44,7 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -114,7 +114,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+			any(Time.class))).thenReturn(new CompletableFuture<>());
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -241,7 +241,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(allocationId),
 				anyString(),
 				eq(leaderId),
-				any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -280,7 +280,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+			any(Time.class))).thenReturn(new CompletableFuture<>());
 
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
@@ -338,7 +338,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(allocationId),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -482,7 +482,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -527,7 +527,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -747,8 +747,8 @@ public class SlotManagerTest extends TestLogger {
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
-		final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
-		final FlinkCompletableFuture<Acknowledge> slotRequestFuture2 = new FlinkCompletableFuture<>();
+		final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
+		final CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<>();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		when(taskExecutorGateway.requestSlot(
@@ -826,7 +826,7 @@ public class SlotManagerTest extends TestLogger {
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
-		final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
+		final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		when(taskExecutorGateway.requestSlot(
@@ -835,7 +835,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(allocationId),
 			anyString(),
 			any(UUID.class),
-			any(Time.class))).thenReturn(slotRequestFuture1, FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -856,24 +856,21 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			Future<Void> registrationFuture = FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
+			CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
+				() -> {
 					slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 					return null;
-				}
-			}, mainThreadExecutor)
-			.thenAccept(new AcceptFunction<Void>() {
-				@Override
-				public void accept(Void value) {
+				},
+				mainThreadExecutor)
+			.thenAccept(
+				(Object value) -> {
 					try {
 						slotManager.registerSlotRequest(slotRequest);
 					} catch (SlotManagerException e) {
 						throw new RuntimeException("Could not register slots.", e);
 					}
-				}
-			});
+				});
 
 			// check that no exception has been thrown
 			registrationFuture.get();
@@ -891,12 +888,9 @@ public class SlotManagerTest extends TestLogger {
 			final SlotID requestedSlotId = slotIdCaptor.getValue();
 			final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
 
-			Future<Boolean> freeSlotFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
-				@Override
-				public Boolean call() throws Exception {
-					return slotManager.getSlot(freeSlotId).isFree();
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<Boolean> freeSlotFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.getSlot(freeSlotId).isFree(),
+				mainThreadExecutor);
 
 			assertTrue(freeSlotFuture.get());
 
@@ -904,15 +898,10 @@ public class SlotManagerTest extends TestLogger {
 			final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
 			final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
 
-			FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					// this should update the slot with the pending slot request triggering the reassignment of it
-					slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
-
-					return null;
-				}
-			}, mainThreadExecutor);
+			CompletableFuture.supplyAsync(
+				// this should update the slot with the pending slot request triggering the reassignment of it
+				() -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport),
+				mainThreadExecutor);
 
 			verify(taskExecutorGateway, timeout(verifyTimeout).times(2)).requestSlot(
 				slotIdCaptor.capture(),
@@ -926,12 +915,9 @@ public class SlotManagerTest extends TestLogger {
 
 			assertEquals(slotId2, requestedSlotId2);
 
-			Future<TaskManagerSlot> requestedSlotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
-				@Override
-				public TaskManagerSlot call() throws Exception {
-					return slotManager.getSlot(requestedSlotId2);
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<TaskManagerSlot> requestedSlotFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.getSlot(requestedSlotId2),
+				mainThreadExecutor);
 
 			TaskManagerSlot slot = requestedSlotFuture.get();
 
@@ -967,7 +953,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(allocationId),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -987,20 +973,16 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					slotManager.registerSlotRequest(slotRequest);
-
-					return null;
-				}
-			}, mainThreadExecutor)
-			.thenAccept(new AcceptFunction<Void>() {
-				@Override
-				public void accept(Void value) {
-					slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
-				}
-			});
+			CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						return slotManager.registerSlotRequest(slotRequest);
+					} catch (SlotManagerException e) {
+						throw new FlinkFutureException(e);
+					}
+				},
+				mainThreadExecutor)
+			.thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
 
 			ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
 
@@ -1012,44 +994,28 @@ public class SlotManagerTest extends TestLogger {
 				eq(leaderId),
 				any(Time.class));
 
-			Future<Boolean> idleFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
-				@Override
-				public Boolean call() throws Exception {
-					return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
+				mainThreadExecutor);
 
 			// check that the TaskManaer is not idle
 			assertFalse(idleFuture.get());
 
 			final SlotID slotId = slotIdArgumentCaptor.getValue();
 
-			Future<TaskManagerSlot> slotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
-				@Override
-				public TaskManagerSlot call() throws Exception {
-					return slotManager.getSlot(slotId);
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.getSlot(slotId),
+				mainThreadExecutor);
 
 			TaskManagerSlot slot = slotFuture.get();
 
 			assertTrue(slot.isAllocated());
 			assertEquals(allocationId, slot.getAllocationId());
 
-			Future<Boolean> idleFuture2 = FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					slotManager.freeSlot(slotId, allocationId);
-
-					return null;
-				}
-			}, mainThreadExecutor)
-			.thenApply(new ApplyFunction<Void, Boolean>() {
-				@Override
-				public Boolean apply(Void value) {
-					return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
-				}
-			});
+			CompletableFuture<Boolean> idleFuture2 = CompletableFuture.runAsync(
+				() -> slotManager.freeSlot(slotId, allocationId),
+				mainThreadExecutor)
+			.thenApply((Object value) -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()));
 
 			assertTrue(idleFuture2.get());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a1ab1ab..844e159 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
@@ -41,6 +40,7 @@ import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -104,7 +104,7 @@ public class SlotProtocolTest extends TestLogger {
 			Mockito.when(
 				taskExecutorGateway
 					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
-				.thenReturn(mock(FlinkFuture.class));
+				.thenReturn(mock(CompletableFuture.class));
 
 			final ResourceID resourceID = ResourceID.generate();
 			final SlotID slotID = new SlotID(resourceID, 0);
@@ -139,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
 		Mockito.when(
 			taskExecutorGateway
 				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
-			.thenReturn(mock(FlinkFuture.class));
+			.thenReturn(mock(CompletableFuture.class));
 
 		try (SlotManager slotManager = new SlotManager(
 			scheduledExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e636d6c..4be5257 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -23,14 +23,13 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
@@ -90,9 +89,8 @@ public class AsyncCallsTest extends TestLogger {
 			});
 		}
 	
-		Future<String> result = testEndpoint.callAsync(new Callable<String>() {
-			@Override
-			public String call() throws Exception {
+		CompletableFuture<String> result = testEndpoint.callAsync(
+			() -> {
 				boolean holdsLock = lock.tryLock();
 				if (holdsLock) {
 					lock.unlock();
@@ -100,8 +98,8 @@ public class AsyncCallsTest extends TestLogger {
 					concurrentAccess.set(true);
 				}
 				return "test";
-			}
-		}, Time.seconds(30L));
+			},
+			Time.seconds(30L));
 
 		String str = result.get(30, TimeUnit.SECONDS);
 		assertEquals("test", str);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index bbde331..07dadae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -42,6 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -74,7 +74,7 @@ public class RpcCompletenessTest extends TestLogger {
 
 	private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
 
-	private static final Class<?> futureClass = Future.class;
+	private static final Class<?> futureClass = CompletableFuture.class;
 	private static final Class<?> timeoutClass = Time.class;
 
 	@Test
@@ -195,7 +195,7 @@ public class RpcCompletenessTest extends TestLogger {
 	/**
 	 * Checks whether the gateway method fulfills the gateway method requirements.
 	 * <ul>
-	 *     <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
+	 *     <li>It checks whether the return type is void or a {@link CompletableFuture} wrapping the actual result. </li>
 	 *     <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
 	 * </ul>
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index e05c8d8..4220fff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -33,6 +32,7 @@ import org.junit.Test;
 import scala.Option;
 import scala.Tuple2;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -57,7 +57,7 @@ public class RpcConnectionTest {
 			// can only pass if the connection problem is not recognized merely via a timeout
 			rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
 
-			Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
+			CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
 
 			future.get(10000000, TimeUnit.SECONDS);
 			fail("should never complete normally");

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 03fe84b..ccf0acd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -77,8 +74,8 @@ public abstract class TestingGatewayBase implements RpcGateway {
 	//  utilities
 	// ------------------------------------------------------------------------
 
-	public <T> Future<T> futureWithTimeout(long timeoutMillis) {
-		FlinkCompletableFuture<T> future = new FlinkCompletableFuture<>();
+	public <T> CompletableFuture<T> futureWithTimeout(long timeoutMillis) {
+		CompletableFuture<T> future = new CompletableFuture<>();
 		executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS);
 		return future;
 	}