You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/03 11:49:03 UTC

[1/3] flink git commit: [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService

Repository: flink
Updated Branches:
  refs/heads/master 74e479e51 -> eddafc1ac


http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index b56bf6b..14cf35a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,20 +92,19 @@ public class TestingRpcService extends AkkaRpcService {
 	}
 
 	@Override
-	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
 
 		if (gateway != null) {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return FlinkCompletableFuture.completed(typedGateway);
+				return CompletableFuture.completedFuture(typedGateway);
 			} else {
-				return FlinkCompletableFuture.completedExceptionally(
-					new Exception("Gateway registered under " + address + " is not of type " + clazz));
+				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
 		} else {
-			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
+			return FutureUtils.completedExceptionally(new Exception("No gateway registered under " + address + '.'));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index ac3f40b..37349a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.Preconditions;
 
@@ -36,10 +34,12 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -65,7 +65,7 @@ public class TestingSerialRpcService implements RpcService {
 		executorService = new DirectExecutorService();
 		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
 		this.registeredConnections = new ConcurrentHashMap<>(16);
-		this.terminationFuture = new FlinkCompletableFuture<>();
+		this.terminationFuture = new CompletableFuture<>();
 
 		this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService);
 	}
@@ -88,13 +88,13 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public <T> Future<T> execute(Callable<T> callable) {
+	public <T> CompletableFuture<T> execute(Callable<T> callable) {
 		try {
 			T result = callable.call();
 
-			return FlinkCompletableFuture.completed(result);
+			return CompletableFuture.completedFuture(result);
 		} catch (Exception e) {
-			return FlinkCompletableFuture.completedExceptionally(e);
+			return FutureUtils.completedExceptionally(e);
 		}
 	}
 
@@ -134,7 +134,7 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public Future<Void> getTerminationFuture() {
+	public CompletableFuture<Void> getTerminationFuture() {
 		return terminationFuture;
 	}
 
@@ -178,20 +178,19 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
-	public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
 
 		if (gateway != null) {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return FlinkCompletableFuture.completed(typedGateway);
+				return CompletableFuture.completedFuture(typedGateway);
 			} else {
-				return FlinkCompletableFuture.completedExceptionally(
-					new Exception("Gateway registered under " + address + " is not of type " + clazz));
+				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
 		} else {
-			return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
+			return FutureUtils.completedExceptionally(new Exception("No gateway registered under " + address + '.'));
 		}
 	}
 
@@ -251,12 +250,12 @@ public class TestingSerialRpcService implements RpcService {
 
 				Class<?> returnType = method.getReturnType();
 
-				if (returnType.equals(Future.class)) {
+				if (returnType.equals(CompletableFuture.class)) {
 					try {
 						Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
-						return FlinkCompletableFuture.completed(result);
+						return CompletableFuture.completedFuture(result);
 					} catch (Throwable e) {
-						return FlinkCompletableFuture.completedExceptionally(e);
+						return FutureUtils.completedExceptionally(e);
 					}
 				} else {
 					return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
@@ -290,11 +289,11 @@ public class TestingSerialRpcService implements RpcService {
 		}
 
 		@Override
-		public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
+		public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
 			try {
-				return FlinkCompletableFuture.completed(callable.call());
+				return CompletableFuture.completedFuture(callable.call());
 			} catch (Throwable e) {
-				return FlinkCompletableFuture.completedExceptionally(e);
+				return FutureUtils.completedExceptionally(e);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 0b06267..793d292 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -21,9 +21,6 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -37,7 +34,7 @@ import org.hamcrest.core.Is;
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.assertEquals;
@@ -74,7 +71,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	public void testAddressResolution() throws Exception {
 		DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 
-		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+		CompletableFuture<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
 
 		DummyRpcGateway rpcGateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
 
@@ -86,7 +83,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	 */
 	@Test
 	public void testFailingAddressResolution() throws Exception {
-		Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
+		CompletableFuture<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
 
 		try {
 			futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
@@ -111,7 +108,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
 
 		// this message should be discarded and completed with an AkkaRpcException
-		Future<Integer> result = rpcGateway.foobar();
+		CompletableFuture<Integer> result = rpcGateway.foobar();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -150,14 +147,14 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		rpcEndpoint.start();
 
-		Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
+		CompletableFuture<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
 
 		WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit());
 
 		// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
 		gateway.tell("foobar");
 
-		Future<Boolean> result = gateway.barfoo();
+		CompletableFuture<Boolean> result = gateway.barfoo();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -178,18 +175,13 @@ public class AkkaRpcActorTest extends TestLogger {
 		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 		rpcEndpoint.start();
 
-		Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
 
 		assertFalse(terminationFuture.isDone());
 
-		FlinkFuture.supplyAsync(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				rpcEndpoint.shutDown();
-
-				return null;
-			}
-		}, actorSystem.dispatcher());
+		CompletableFuture.runAsync(
+			() -> rpcEndpoint.shutDown(),
+			actorSystem.dispatcher());
 
 		// wait until the rpc endpoint has terminated
 		terminationFuture.get();
@@ -201,7 +193,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.start();
 
 		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
-		Future<Integer> result = rpcGateway.doStuff();
+		CompletableFuture<Integer> result = rpcGateway.doStuff();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -220,7 +212,7 @@ public class AkkaRpcActorTest extends TestLogger {
 		rpcEndpoint.start();
 
 		ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
-		Future<Integer> result = rpcGateway.doStuff();
+		CompletableFuture<Integer> result = rpcGateway.doStuff();
 
 		try {
 			result.get(timeout.getSize(), timeout.getUnit());
@@ -244,7 +236,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		rpcEndpoint.shutDown();
 
-		Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
 
 		try {
 			terminationFuture.get();
@@ -263,7 +255,7 @@ public class AkkaRpcActorTest extends TestLogger {
 
 		simpleRpcEndpoint.shutDown();
 
-		Future<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
 
 		// check that we executed the postStop method in the main thread, otherwise an exception
 		// would be thrown here.
@@ -275,11 +267,11 @@ public class AkkaRpcActorTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private interface DummyRpcGateway extends RpcGateway {
-		Future<Integer> foobar();
+		CompletableFuture<Integer> foobar();
 	}
 
 	private interface WrongRpcGateway extends RpcGateway {
-		Future<Boolean> barfoo();
+		CompletableFuture<Boolean> barfoo();
 		void tell(String message);
 	}
 
@@ -304,7 +296,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private interface ExceptionalGateway extends RpcGateway {
-		Future<Integer> doStuff();
+		CompletableFuture<Integer> doStuff();
 	}
 
 	private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
@@ -326,8 +318,8 @@ public class AkkaRpcActorTest extends TestLogger {
 		}
 
 		@RpcMethod
-		public Future<Integer> doStuff() {
-			final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+		public CompletableFuture<Integer> doStuff() {
+			final CompletableFuture<Integer> future = new CompletableFuture<>();
 
 			// complete the future slightly in the, well, future...
 			new Thread() {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 42f63ef..e0d1110 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -22,15 +22,14 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
@@ -91,26 +90,21 @@ public class AkkaRpcServiceTest extends TestLogger {
 	public void testExecuteRunnable() throws Exception {
 		final OneShotLatch latch = new OneShotLatch();
 
-		akkaRpcService.execute(new Runnable() {
-			@Override
-			public void run() {
-				latch.trigger();
-			}
-		});
+		akkaRpcService.execute(() -> latch.trigger());
 
 		latch.await(30L, TimeUnit.SECONDS);
 	}
 
 	/**
 	 * Tests that the {@link AkkaRpcService} can execute callables and returns their result as
-	 * a {@link Future}.
+	 * a {@link CompletableFuture}.
 	 */
 	@Test
 	public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
 		final OneShotLatch latch = new OneShotLatch();
 		final int expected = 42;
 
-		Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
+		CompletableFuture<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
 			@Override
 			public Integer call() throws Exception {
 				latch.trigger();
@@ -145,18 +139,11 @@ public class AkkaRpcServiceTest extends TestLogger {
 		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
 		final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
 
-		Future<Void> terminationFuture = rpcService.getTerminationFuture();
+		CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture();
 
 		assertFalse(terminationFuture.isDone());
 
-		FlinkFuture.supplyAsync(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				rpcService.stopService();
-
-				return null;
-			}
-		}, actorSystem.dispatcher());
+		CompletableFuture.runAsync(() -> rpcService.stopService(), actorSystem.dispatcher());
 
 		terminationFuture.get();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index d640a97..34cf412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -23,7 +23,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -35,6 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertThat;
@@ -108,7 +108,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		String address = testEndpoint.getAddress();
 
-		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+		CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
 
 		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
@@ -129,7 +129,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		String address = testEndpoint.getAddress();
 
-		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+		CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
 
 		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 
@@ -153,7 +153,7 @@ public class MessageSerializationTest extends TestLogger {
 
 		String address = testEndpoint.getAddress();
 
-		Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+		CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
 
 		TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 6a0bd87..53c435e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -26,8 +26,6 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -62,6 +60,7 @@ import org.junit.Test;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -159,13 +158,13 @@ public class TaskExecutorITCase extends TestLogger {
 		JobMasterGateway jmGateway = mock(JobMasterGateway.class);
 
 		when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
-			.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
+			.thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
 		when(jmGateway.getHostname()).thenReturn(jmAddress);
 		when(jmGateway.offerSlots(
 			eq(taskManagerResourceId),
 			any(Iterable.class),
 			eq(jmLeaderId),
-			any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS));
+			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 
 		rpcService.registerGateway(rmAddress, resourceManager.getSelf());
@@ -185,7 +184,7 @@ public class TaskExecutorITCase extends TestLogger {
 			// notify the TM about the new RM leader
 			rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
 
-			Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
+			CompletableFuture<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
 				rmLeaderId,
 				jmLeaderId,
 				jmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b596f75..a4f0e03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -28,10 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -65,7 +62,6 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
@@ -95,6 +91,7 @@ import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 
@@ -168,7 +165,7 @@ public class TaskExecutorTest extends TestLogger {
 				eq(taskManagerLocation),
 				eq(jmLeaderId),
 				any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
 		when(jobMasterGateway.getHostname()).thenReturn("localhost");
 
@@ -230,7 +227,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(rmGateway.registerTaskExecutor(
 			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(
-				FlinkCompletableFuture.<RegistrationResponse>completed(
+				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
 						new InstanceID(),
 						rmResourceId,
@@ -334,7 +331,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(rmGateway.registerTaskExecutor(
 			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(
-				FlinkCompletableFuture.<RegistrationResponse>completed(
+				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
 						new InstanceID(),
 						rmResourceId,
@@ -467,7 +464,7 @@ public class TaskExecutorTest extends TestLogger {
 			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 			when(rmGateway.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(
+				.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
 					new InstanceID(), resourceManagerResourceId, 10L)));
 
 			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
@@ -539,11 +536,11 @@ public class TaskExecutorTest extends TestLogger {
 
 			when(rmGateway1.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+					.thenReturn(CompletableFuture.completedFuture(
 						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
 			when(rmGateway2.registerTaskExecutor(
 					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-					.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+					.thenReturn(CompletableFuture.completedFuture(
 						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
 
 			rpc.registerGateway(address1, rmGateway1);
@@ -728,7 +725,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			taskManager.submitTask(tdd, jobManagerLeaderId);
 
-			Future<Boolean> completionFuture = TestInvokable.completableFuture;
+			CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture;
 
 			completionFuture.get();
 
@@ -744,7 +741,7 @@ public class TaskExecutorTest extends TestLogger {
 	 */
 	public static class TestInvokable extends AbstractInvokable {
 
-		static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture<>();
+		static final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
 
 		@Override
 		public void invoke() throws Exception {
@@ -793,7 +790,7 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final String jobManagerAddress = "jm";
 		final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -807,13 +804,13 @@ public class TaskExecutorTest extends TestLogger {
 				eq(taskManagerLocation),
 				eq(jobManagerLeaderId),
 				any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 		when(jobMasterGateway.offerSlots(
 			any(ResourceID.class),
 			any(Iterable.class),
 			any(UUID.class),
-			any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS));
+			any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -906,7 +903,7 @@ public class TaskExecutorTest extends TestLogger {
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
 		final int blobPort = 42;
@@ -923,12 +920,12 @@ public class TaskExecutorTest extends TestLogger {
 				eq(taskManagerLocation),
 				eq(jobManagerLeaderId),
 				any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 		when(jobMasterGateway.offerSlots(
 				any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
-			.thenReturn(FlinkCompletableFuture.completed((Iterable<SlotOffer>)Collections.singleton(offer1)));
+			.thenReturn(CompletableFuture.completedFuture((Iterable<SlotOffer>)Collections.singleton(offer1)));
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -1120,7 +1117,7 @@ public class TaskExecutorTest extends TestLogger {
 			eq(resourceId),
 			any(SlotReport.class),
 			any(Time.class))).thenReturn(
-				FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+				CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
 
 		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
 		final int blobPort = 42;
@@ -1137,7 +1134,7 @@ public class TaskExecutorTest extends TestLogger {
 			eq(taskManagerLocation),
 			eq(jobManagerLeaderId),
 			any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
 
@@ -1224,7 +1221,7 @@ public class TaskExecutorTest extends TestLogger {
 				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 				Collections.<InputGateDeploymentDescriptor>emptyList());
 
-			CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new FlinkCompletableFuture<>();
+			CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new CompletableFuture<>();
 
 			// submit task first and then return acceptance response
 			when(

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e790ea8..fdae251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -35,8 +35,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -110,6 +108,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
@@ -1612,7 +1611,7 @@ public class TaskManagerTest extends TestLogger {
 
 				Await.result(result, timeout);
 
-				org.apache.flink.runtime.concurrent.Future<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
+				CompletableFuture<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
 
 				assertEquals(true, cancelFuture.get());
 			} finally {
@@ -2070,7 +2069,7 @@ public class TaskManagerTest extends TestLogger {
 	public static final class TestInvokableRecordCancel extends AbstractInvokable {
 
 		private static final Object lock = new Object();
-		private static CompletableFuture<Boolean> gotCanceledFuture = new FlinkCompletableFuture<>();
+		private static CompletableFuture<Boolean> gotCanceledFuture = new CompletableFuture<>();
 
 		@Override
 		public void invoke() throws Exception {
@@ -2099,11 +2098,11 @@ public class TaskManagerTest extends TestLogger {
 
 		public static void resetGotCanceledFuture() {
 			synchronized (lock) {
-				gotCanceledFuture = new FlinkCompletableFuture<>();
+				gotCanceledFuture = new CompletableFuture<>();
 			}
 		}
 
-		public static org.apache.flink.runtime.concurrent.Future<Boolean> gotCanceled() {
+		public static CompletableFuture<Boolean> gotCanceled() {
 			synchronized (lock) {
 				return gotCanceledFuture;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index 5832b89..f3b68c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,7 +33,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -92,17 +91,20 @@ public class OrderedStreamElementQueueTest extends TestLogger {
 			queue.put(entry);
 		}
 
-		Future<List<AsyncResult>> pollOperation = FlinkFuture.supplyAsync(new Callable<List<AsyncResult>>() {
-			@Override
-			public List<AsyncResult> call() throws Exception {
+		CompletableFuture<List<AsyncResult>> pollOperation = CompletableFuture.supplyAsync(
+			() -> {
 				List<AsyncResult> result = new ArrayList<>(4);
 				while (!queue.isEmpty()) {
-					result.add(queue.poll());
+					try {
+						result.add(queue.poll());
+					} catch (InterruptedException e) {
+						throw new FlinkFutureException(e);
+					}
 				}
 
 				return result;
-			}
-		}, executor);
+			},
+			executor);
 
 		Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index fe9db95..d396756 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -37,7 +36,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -176,14 +175,15 @@ public class StreamElementQueueTest extends TestLogger {
 
 		Assert.assertEquals(1, queue.size());
 
-		Future<Void> putOperation = FlinkFuture.supplyAsync(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				queue.put(streamRecordQueueEntry2);
-
-				return null;
-			}
-		}, executor);
+		CompletableFuture<Void> putOperation = CompletableFuture.runAsync(
+			() -> {
+				try {
+					queue.put(streamRecordQueueEntry2);
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		// give the future a chance to complete
 		Thread.sleep(10L);
@@ -215,12 +215,15 @@ public class StreamElementQueueTest extends TestLogger {
 
 		Assert.assertTrue(queue.isEmpty());
 
-		Future<AsyncResult> peekOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.peekBlockingly();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> peekOperation = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.peekBlockingly();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		Thread.sleep(10L);
 
@@ -236,12 +239,15 @@ public class StreamElementQueueTest extends TestLogger {
 		Assert.assertEquals(watermarkQueueEntry, queue.poll());
 		Assert.assertTrue(queue.isEmpty());
 
-		Future<AsyncResult> pollOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> pollOperation = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index ba6ce42..cc0bc30 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,7 +34,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -100,12 +99,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 
 		Assert.assertTrue(8 == queue.size());
 
-		Future<AsyncResult> firstPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> firstPoll = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		// this should not fulfill the poll, because R3 is behind W1
 		record3.collect(Collections.<Integer>emptyList());
@@ -118,12 +120,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 
 		Assert.assertEquals(record2, firstPoll.get());
 
-		Future<AsyncResult> secondPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> secondPoll = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		record6.collect(Collections.<Integer>emptyList());
 		record4.collect(Collections.<Integer>emptyList());
@@ -161,12 +166,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 		// only R5 left in the queue
 		Assert.assertTrue(1 == queue.size());
 
-		Future<AsyncResult> thirdPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
-			@Override
-			public AsyncResult call() throws Exception {
-				return queue.poll();
-			}
-		}, executor);
+		CompletableFuture<AsyncResult> thirdPoll = CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return queue.poll();
+				} catch (InterruptedException e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 
 		Thread.sleep(10L);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index f021b38..4f2135d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.Environment;
@@ -75,6 +73,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.Collections;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.FutureTask;
 
 import static org.junit.Assert.assertEquals;
@@ -160,14 +159,9 @@ public class StreamTaskTerminationTest extends TestLogger {
 			mock(PartitionProducerStateChecker.class),
 			Executors.directExecutor());
 
-		Future<Void> taskRun = FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
-			@Override
-			public Void call() throws Exception {
-				task.run();
-
-				return null;
-			}
-		}, TestingUtils.defaultExecutor());
+		CompletableFuture<Void> taskRun = CompletableFuture.runAsync(
+			() -> task.run(),
+			TestingUtils.defaultExecutor());
 
 		// wait until the stream task started running
 		RUN_LATCH.await();


[3/3] flink git commit: [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService

Posted by tr...@apache.org.
[FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService

Remove Futures from RpcGateways

Remove Future usage

Fix failing AkkaRpcActorTest

This closes #4462.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eddafc1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eddafc1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eddafc1a

Branch: refs/heads/master
Commit: eddafc1ac9e4d787df44b63809f0d6dfd1f3def7
Parents: 74e479e
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 1 11:33:48 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Aug 3 13:45:38 2017 +0200

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |   6 +-
 .../handlers/TaskManagerLogHandler.java         |   4 +-
 .../handlers/TaskManagerLogHandlerTest.java     |   5 +-
 .../runtime/dispatcher/DispatcherGateway.java   |   6 +-
 .../flink/runtime/executiongraph/Execution.java |  25 ++--
 .../apache/flink/runtime/instance/SlotPool.java |  17 +--
 .../flink/runtime/instance/SlotPoolGateway.java |   8 +-
 .../slots/ActorTaskManagerGateway.java          |  38 ++---
 .../jobmanager/slots/TaskManagerGateway.java    |  19 +--
 .../flink/runtime/jobmaster/JobMaster.java      |  93 ++++++-------
 .../runtime/jobmaster/JobMasterGateway.java     |  18 +--
 .../jobmaster/RpcTaskManagerGateway.java        |  25 ++--
 .../registration/RetryingRegistration.java      |   4 +-
 .../resourcemanager/JobLeaderIdService.java     |  16 +--
 .../resourcemanager/ResourceManager.java        |  29 ++--
 .../resourcemanager/ResourceManagerGateway.java |  10 +-
 .../slotmanager/SlotManager.java                |  16 +--
 .../flink/runtime/rpc/MainThreadExecutable.java |   4 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   6 +-
 .../apache/flink/runtime/rpc/RpcService.java    |  10 +-
 .../apache/flink/runtime/rpc/SelfGateway.java   |   4 +-
 .../runtime/rpc/akka/AkkaInvocationHandler.java |  30 ++--
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  36 ++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |  28 ++--
 .../runtime/taskexecutor/JobLeaderService.java  |   5 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  20 ++-
 .../taskexecutor/TaskExecutorGateway.java       |  16 +--
 ...TaskExecutorToResourceManagerConnection.java |   3 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |   4 +-
 .../taskexecutor/rpc/RpcInputSplitProvider.java |   4 +-
 .../rpc/RpcPartitionStateChecker.java           |   3 +-
 .../RpcResultPartitionConsumableNotifier.java   |  22 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |  17 +--
 .../runtime/akka/QuarantineMonitorTest.java     |  16 +--
 .../CheckpointCoordinatorMasterHooksTest.java   |   5 +-
 .../clusterframework/ResourceManagerTest.java   |   6 +-
 .../runtime/dispatcher/DispatcherTest.java      |   5 +-
 .../ExecutionGraphMetricsTest.java              |   3 +-
 .../ExecutionGraphSchedulingTest.java           |   3 +-
 .../executiongraph/ExecutionGraphStopTest.java  |   7 +-
 .../utils/NotCancelAckingTaskGateway.java       |   9 +-
 .../utils/SimpleAckingTaskManagerGateway.java   |  36 ++---
 .../flink/runtime/instance/SlotPoolRpcTest.java |   4 +-
 .../flink/runtime/instance/SlotPoolTest.java    |  20 +--
 .../flink/runtime/jobmaster/JobMasterTest.java  |   5 +-
 .../registration/RetryingRegistrationTest.java  |  13 +-
 .../registration/TestRegistrationGateway.java   |   7 +-
 .../resourcemanager/JobLeaderIdServiceTest.java |   8 +-
 .../ResourceManagerJobMasterTest.java           |  22 +--
 .../ResourceManagerTaskExecutorTest.java        |  10 +-
 .../slotmanager/SlotManagerTest.java            | 138 +++++++------------
 .../slotmanager/SlotProtocolTest.java           |   6 +-
 .../flink/runtime/rpc/AsyncCallsTest.java       |  12 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  |   6 +-
 .../flink/runtime/rpc/RpcConnectionTest.java    |   4 +-
 .../flink/runtime/rpc/TestingGatewayBase.java   |   9 +-
 .../flink/runtime/rpc/TestingRpcService.java    |  13 +-
 .../runtime/rpc/TestingSerialRpcService.java    |  37 +++--
 .../runtime/rpc/akka/AkkaRpcActorTest.java      |  46 +++----
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  25 +---
 .../rpc/akka/MessageSerializationTest.java      |   8 +-
 .../taskexecutor/TaskExecutorITCase.java        |   9 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  39 +++---
 .../runtime/taskmanager/TaskManagerTest.java    |  11 +-
 .../queue/OrderedStreamElementQueueTest.java    |  20 +--
 .../async/queue/StreamElementQueueTest.java     |  52 +++----
 .../queue/UnorderedStreamElementQueueTest.java  |  50 ++++---
 .../tasks/StreamTaskTerminationTest.java        |  14 +-
 68 files changed, 559 insertions(+), 670 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 6e6a59c..e63b4ab 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -89,6 +88,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -414,7 +414,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		 * Register a job master with the RM.
 		 */
 		public void registerJobMaster(MockJobMaster jobMaster) throws Exception  {
-			Future<RegistrationResponse> registration = resourceManager.registerJobManager(
+			CompletableFuture<RegistrationResponse> registration = resourceManager.registerJobManager(
 				rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID);
 			assertTrue(registration.get() instanceof JobMasterRegistrationSuccess);
 		}
@@ -588,7 +588,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
 
 			// send registration message
-			Future<RegistrationResponse> successfulFuture =
+			CompletableFuture<RegistrationResponse> successfulFuture =
 				resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport);
 			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 9cbb71d..ce29721 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -196,10 +196,10 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
 
 						switch (fileMode) {
 							case LOG:
-								return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout));
+								return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
 							case STDOUT:
 							default:
-								return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout));
+								return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
 						}
 					}
 				);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index bfcaf88..e1c3686 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -25,9 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -50,6 +48,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
 import scala.Option;
@@ -109,7 +108,7 @@ public class TaskManagerLogHandlerTest {
 		when(taskManager.getId()).thenReturn(tmID);
 		when(taskManager.getTaskManagerID()).thenReturn(tmRID);
 		when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-		CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>();
+		CompletableFuture<BlobKey> future = new CompletableFuture<>();
 		future.completeExceptionally(new IOException("failure"));
 		when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index c730bc1..33b8a42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -20,13 +20,13 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Gateway for the Dispatcher component.
@@ -40,7 +40,7 @@ public interface DispatcherGateway extends RpcGateway {
 	 * @param timeout RPC timeout
 	 * @return A future acknowledge if the submission succeeded
 	 */
-	Future<Acknowledge> submitJob(
+	CompletableFuture<Acknowledge> submitJob(
 		JobGraph jobGraph,
 		@RpcTimeout Time timeout);
 
@@ -50,6 +50,6 @@ public interface DispatcherGateway extends RpcGateway {
 	 * @param timeout RPC timeout
 	 * @return A future collection of currently submitted jobs
 	 */
-	Future<Collection<JobID>> listJobs(
+	CompletableFuture<Collection<JobID>> listJobs(
 		@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 5cb12ea..bd5bc7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -418,8 +418,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			final CompletableFuture<Acknowledge> submitResultFuture = FutureUtils.toJava(
-				taskManagerGateway.submitTask(deployment, timeout));
+			final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
 
 			submitResultFuture.whenCompleteAsync(
 				(ack, failure) -> {
@@ -454,7 +453,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
 			CompletableFuture<Acknowledge> stopResultFuture = FutureUtils.retry(
-				() -> FutureUtils.toJava(taskManagerGateway.stopTask(attemptId, timeout)),
+				() -> taskManagerGateway.stopTask(attemptId, timeout),
 				NUM_STOP_CALL_TRIES,
 				executor);
 
@@ -679,14 +678,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			return FutureUtils.toJava(
-				taskManagerGateway.requestStackTraceSample(
-					attemptId,
-					sampleId,
-					numSamples,
-					delayBetweenSamples,
-					maxStrackTraceDepth,
-					timeout));
+			return taskManagerGateway.requestStackTraceSample(
+				attemptId,
+				sampleId,
+				numSamples,
+				delayBetweenSamples,
+				maxStrackTraceDepth,
+				timeout);
 		} else {
 			return FutureUtils.completedExceptionally(new Exception("The execution has no slot assigned."));
 		}
@@ -1011,7 +1009,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
 			CompletableFuture<Acknowledge> cancelResultFuture = FutureUtils.retry(
-				() -> FutureUtils.toJava(taskManagerGateway.cancelTask(attemptId, timeout)),
+				() -> taskManagerGateway.cancelTask(attemptId, timeout),
 				NUM_CANCEL_CALL_TRIES,
 				executor);
 
@@ -1050,8 +1048,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 			final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
 
-			CompletableFuture<Acknowledge> updatePartitionsResultFuture = FutureUtils.toJava(
-				taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout));
+			CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout);
 
 			updatePartitionsResultFuture.whenCompleteAsync(
 				(ack, failure) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 9a26779..508e54f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -263,12 +261,12 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 	// ------------------------------------------------------------------------
 
 	@RpcMethod
-	public Future<SimpleSlot> allocateSlot(
+	public CompletableFuture<SimpleSlot> allocateSlot(
 			ScheduledUnit task,
 			ResourceProfile resources,
 			Iterable<TaskManagerLocation> locationPreferences) {
 
-		return FutureUtils.toFlinkFuture(internalAllocateSlot(task, resources, locationPreferences));
+		return internalAllocateSlot(task, resources, locationPreferences);
 	}
 
 	@RpcMethod
@@ -316,11 +314,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
 		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
 
-		CompletableFuture<Acknowledge> rmResponse = FutureUtils.toJava(
-			resourceManagerGateway.requestSlot(
-				jobManagerLeaderId, resourceManagerLeaderId,
-				new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
-				resourceManagerRequestsTimeout));
+		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
+			jobManagerLeaderId, resourceManagerLeaderId,
+			new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
+			resourceManagerRequestsTimeout);
 
 		CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
 			(Acknowledge value) -> {
@@ -984,7 +981,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 			Iterable<TaskManagerLocation> locationPreferences = 
 					task.getTaskToExecute().getVertex().getPreferredLocations();
 
-			return FutureUtils.toJava(gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
+			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 42942ca..43f407a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * The gateway for calls on the {@link SlotPool}. 
@@ -75,9 +75,9 @@ public interface SlotPoolGateway extends RpcGateway {
 
 	void releaseTaskManager(ResourceID resourceID);
 
-	Future<Boolean> offerSlot(AllocatedSlot slot);
+	CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);
 
-	Future<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
+	CompletableFuture<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
 	
 	void failAllocation(AllocationID allocationID, Exception cause);
 
@@ -85,7 +85,7 @@ public interface SlotPoolGateway extends RpcGateway {
 	//  allocating and disposing slots
 	// ------------------------------------------------------------------------
 
-	Future<SimpleSlot> allocateSlot(
+	CompletableFuture<SimpleSlot> allocateSlot(
 			ScheduledUnit task,
 			ResourceProfile resources,
 			Iterable<TaskManagerLocation> locationPreferences,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index 2876ebe..a773ce9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -24,8 +24,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -41,6 +40,9 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
@@ -78,7 +80,7 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
-	public Future<StackTrace> requestStackTrace(final Time timeout) {
+	public CompletableFuture<StackTrace> requestStackTrace(final Time timeout) {
 		Preconditions.checkNotNull(timeout);
 
 		scala.concurrent.Future<StackTrace> stackTraceFuture = actorGateway.ask(
@@ -86,11 +88,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
 			.mapTo(ClassTag$.MODULE$.<StackTrace>apply(StackTrace.class));
 
-		return new FlinkFuture<>(stackTraceFuture);
+		return FutureUtils.toJava(stackTraceFuture);
 	}
 
 	@Override
-	public Future<StackTraceSampleResponse> requestStackTraceSample(
+	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
 			ExecutionAttemptID executionAttemptID,
 			int sampleId,
 			int numSamples,
@@ -113,11 +115,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
 			.mapTo(ClassTag$.MODULE$.<StackTraceSampleResponse>apply(StackTraceSampleResponse.class));
 
-		return new FlinkFuture<>(stackTraceSampleResponseFuture);
+		return FutureUtils.toJava(stackTraceSampleResponseFuture);
 	}
 
 	@Override
-	public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
 		Preconditions.checkNotNull(tdd);
 		Preconditions.checkNotNull(timeout);
 
@@ -126,11 +128,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
 			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
 
-		return new FlinkFuture<>(submitResult);
+		return FutureUtils.toJava(submitResult);
 	}
 
 	@Override
-	public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+	public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		Preconditions.checkNotNull(executionAttemptID);
 		Preconditions.checkNotNull(timeout);
 
@@ -139,11 +141,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
 			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
 
-		return new FlinkFuture<>(stopResult);
+		return FutureUtils.toJava(stopResult);
 	}
 
 	@Override
-	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		Preconditions.checkNotNull(executionAttemptID);
 		Preconditions.checkNotNull(timeout);
 
@@ -152,11 +154,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
 			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
 
-		return new FlinkFuture<>(cancelResult);
+		return FutureUtils.toJava(cancelResult);
 	}
 
 	@Override
-	public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+	public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
 		Preconditions.checkNotNull(executionAttemptID);
 		Preconditions.checkNotNull(partitionInfos);
 
@@ -169,7 +171,7 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			new FiniteDuration(timeout.getSize(), timeout.getUnit()))
 			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
 
-		return new FlinkFuture<>(updatePartitionsResult);
+		return FutureUtils.toJava(updatePartitionsResult);
 	}
 
 	@Override
@@ -207,16 +209,16 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+	public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
 		return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerLog(), timeout);
 	}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+	public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
 		return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
 	}
 
-	private Future<BlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
+	private CompletableFuture<BlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
 		Preconditions.checkNotNull(request);
 		Preconditions.checkNotNull(timeout);
 
@@ -226,6 +228,6 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 				new FiniteDuration(timeout.getSize(), timeout.getUnit()))
 			.mapTo(ClassTag$.MODULE$.<BlobKey>apply(BlobKey.class));
 
-		return new FlinkFuture<>(blobKeyFuture);
+		return FutureUtils.toJava(blobKeyFuture);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 09f104f..36cea86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -32,6 +31,8 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * Task manager gateway interface to communicate with the task manager.
  */
@@ -66,7 +67,7 @@ public interface TaskManagerGateway {
 	 * @param timeout for the stack trace request
 	 * @return Future for a stack trace
 	 */
-	Future<StackTrace> requestStackTrace(final Time timeout);
+	CompletableFuture<StackTrace> requestStackTrace(final Time timeout);
 
 	/**
 	 * Request a stack trace sample from the given task.
@@ -79,7 +80,7 @@ public interface TaskManagerGateway {
 	 * @param timeout of the request
 	 * @return Future of stack trace sample response
 	 */
-	Future<StackTraceSampleResponse> requestStackTraceSample(
+	CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
 		final ExecutionAttemptID executionAttemptID,
 		final int sampleId,
 		final int numSamples,
@@ -94,7 +95,7 @@ public interface TaskManagerGateway {
 	 * @param timeout of the submit operation
 	 * @return Future acknowledge of the successful operation
 	 */
-	Future<Acknowledge> submitTask(
+	CompletableFuture<Acknowledge> submitTask(
 		TaskDeploymentDescriptor tdd,
 		Time timeout);
 
@@ -105,7 +106,7 @@ public interface TaskManagerGateway {
 	 * @param timeout of the submit operation
 	 * @return Future acknowledge if the task is successfully stopped
 	 */
-	Future<Acknowledge> stopTask(
+	CompletableFuture<Acknowledge> stopTask(
 		ExecutionAttemptID executionAttemptID,
 		Time timeout);
 
@@ -116,7 +117,7 @@ public interface TaskManagerGateway {
 	 * @param timeout of the submit operation
 	 * @return Future acknowledge if the task is successfully canceled
 	 */
-	Future<Acknowledge> cancelTask(
+	CompletableFuture<Acknowledge> cancelTask(
 		ExecutionAttemptID executionAttemptID,
 		Time timeout);
 
@@ -128,7 +129,7 @@ public interface TaskManagerGateway {
 	 * @param timeout of the submit operation
 	 * @return Future acknowledge if the partitions have been successfully updated
 	 */
-	Future<Acknowledge> updatePartitions(
+	CompletableFuture<Acknowledge> updatePartitions(
 		ExecutionAttemptID executionAttemptID,
 		Iterable<PartitionInfo> partitionInfos,
 		Time timeout);
@@ -176,7 +177,7 @@ public interface TaskManagerGateway {
 	 * @param timeout for the request
 	 * @return Future blob key under which the task manager log has been stored
 	 */
-	Future<BlobKey> requestTaskManagerLog(final Time timeout);
+	CompletableFuture<BlobKey> requestTaskManagerLog(final Time timeout);
 
 	/**
 	 * Request the task manager stdout from the task manager.
@@ -184,5 +185,5 @@ public interface TaskManagerGateway {
 	 * @param timeout for the request
 	 * @return Future blob key under which the task manager stdout file has been stored
 	 */
-	Future<BlobKey> requestTaskManagerStdout(final Time timeout);
+	CompletableFuture<BlobKey> requestTaskManagerStdout(final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7922baa..e2e117a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -36,10 +36,6 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -105,8 +101,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -684,7 +680,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public Future<Iterable<SlotOffer>> offerSlots(
+	public CompletableFuture<Iterable<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
 			final Iterable<SlotOffer> slots,
 			final UUID leaderId) throws Exception {
@@ -736,7 +732,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public Future<RegistrationResponse> registerTaskManager(
+	public CompletableFuture<RegistrationResponse> registerTaskManager(
 			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId) throws Exception
@@ -755,48 +751,42 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			final RegistrationResponse response = new JMTMRegistrationSuccess(
 				resourceId, libraryCacheManager.getBlobServerPort());
-			return FlinkCompletableFuture.completed(response);
+			return CompletableFuture.completedFuture(response);
 		} else {
-			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
-				@Override
-				public TaskExecutorGateway call() throws Exception {
-					return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
-							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
-				}
-			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
-				@Override
-				public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
-					if (throwable != null) {
-						return new RegistrationResponse.Decline(throwable.getMessage());
-					}
-
-					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
-						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
-										"leader session ID {} did not equal the received leader session ID {}.",
-								taskManagerId, taskManagerRpcAddress,
-								JobMaster.this.leaderSessionID, leaderId);
-						return new RegistrationResponse.Decline("Invalid leader session id");
-					}
-
-					slotPoolGateway.registerTaskManager(taskManagerId);
-					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
-
-					// monitor the task manager as heartbeat target
-					taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
-						@Override
-						public void receiveHeartbeat(ResourceID resourceID, Void payload) {
-							// the task manager will not request heartbeat, so this method will never be called currently
+			return getRpcService()
+				.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
+				.handleAsync(
+					(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
+						if (throwable != null) {
+							return new RegistrationResponse.Decline(throwable.getMessage());
 						}
 
-						@Override
-						public void requestHeartbeat(ResourceID resourceID, Void payload) {
-							taskExecutorGateway.heartbeatFromJobManager(resourceID);
+						if (!Objects.equals(leaderSessionID, leaderId)) {
+							log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+									"leader session ID {} did not equal the received leader session ID {}.",
+								taskManagerId, taskManagerRpcAddress, leaderSessionID, leaderId);
+							return new RegistrationResponse.Decline("Invalid leader session id");
 						}
-					});
 
-					return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
-				}
-			}, getMainThreadExecutor());
+						slotPoolGateway.registerTaskManager(taskManagerId);
+						registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
+
+						// monitor the task manager as heartbeat target
+						taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
+							@Override
+							public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+								// the task manager will not request heartbeat, so this method will never be called currently
+							}
+
+							@Override
+							public void requestHeartbeat(ResourceID resourceID, Void payload) {
+								taskExecutorGateway.heartbeatFromJobManager(resourceID);
+							}
+						});
+
+						return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
+					},
+					getMainThreadExecutor());
 		}
 	}
 
@@ -1051,14 +1041,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				{
 					Time timeout = Time.milliseconds(timeoutMillis);
 
-					return FutureUtils.toJava(
-						gateway.registerJobManager(
-							leaderId,
-							jobManagerLeaderID,
-							jobManagerResourceID,
-							jobManagerRpcAddress,
-							jobID,
-							timeout));
+					return gateway.registerJobManager(
+						leaderId,
+						jobManagerLeaderID,
+						jobManagerResourceID,
+						jobManagerRpcAddress,
+						jobID,
+						timeout);
 				}
 			};
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 5a271f9..e3611a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -45,6 +44,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link JobMaster} rpc gateway interface
@@ -68,7 +68,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param taskExecutionState New task execution state for a given task
 	 * @return Future flag of the task execution state update result
 	 */
-	Future<Acknowledge> updateTaskExecutionState(
+	CompletableFuture<Acknowledge> updateTaskExecutionState(
 			final UUID leaderSessionID,
 			final TaskExecutionState taskExecutionState);
 
@@ -81,7 +81,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param executionAttempt The execution attempt id
 	 * @return The future of the input split. If there is no further input split, will return an empty object.
 	 */
-	Future<SerializedInputSplit> requestNextInputSplit(
+	CompletableFuture<SerializedInputSplit> requestNextInputSplit(
 			final UUID leaderSessionID,
 			final JobVertexID vertexID,
 			final ExecutionAttemptID executionAttempt);
@@ -95,7 +95,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param partitionId          The partition ID of the partition to request the state of.
 	 * @return The future of the partition state
 	 */
-	Future<ExecutionState> requestPartitionState(
+	CompletableFuture<ExecutionState> requestPartitionState(
 			final UUID leaderSessionID,
 			final IntermediateDataSetID intermediateResultId,
 			final ResultPartitionID partitionId);
@@ -114,7 +114,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param timeout         before the rpc call fails
 	 * @return Future acknowledge of the schedule or update operation
 	 */
-	Future<Acknowledge> scheduleOrUpdateConsumers(
+	CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
 			final UUID leaderSessionID,
 			final ResultPartitionID partitionID,
 			@RpcTimeout final Time timeout);
@@ -146,7 +146,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param registrationName Name under which the KvState has been registered.
 	 * @return Future of the requested {@link InternalKvState} location
 	 */
-	Future<KvStateLocation> lookupKvStateLocation(final String registrationName);
+	CompletableFuture<KvStateLocation> lookupKvStateLocation(final String registrationName);
 
 	/**
 	 * @param jobVertexId          JobVertexID the KvState instance belongs to.
@@ -175,7 +175,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	/**
 	 * Request the classloading props of this job.
 	 */
-	Future<ClassloadingProps> requestClassloadingProps();
+	CompletableFuture<ClassloadingProps> requestClassloadingProps();
 
 	/**
 	 * Offer the given slots to the job manager. The response contains the set of accepted slots.
@@ -186,7 +186,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param timeout       for the rpc call
 	 * @return Future set of accepted slots.
 	 */
-	Future<Iterable<SlotOffer>> offerSlots(
+	CompletableFuture<Iterable<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
 			final Iterable<SlotOffer> slots,
 			final UUID leaderId,
@@ -214,7 +214,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param timeout               for the rpc call
 	 * @return Future registration response indicating whether the registration was successful or not
 	 */
-	Future<RegistrationResponse> registerTaskManager(
+	CompletableFuture<RegistrationResponse> registerTaskManager(
 			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 28fef27..e93c907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Implementation of the {@link TaskManagerGateway} for Flink's RPC system
@@ -68,47 +68,40 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
-	public Future<StackTrace> requestStackTrace(Time timeout) {
+	public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
 //		return taskExecutorGateway.requestStackTrace(timeout);
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}
 
 	@Override
-	public Future<StackTraceSampleResponse> requestStackTraceSample(
+	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
 			ExecutionAttemptID executionAttemptID,
 			int sampleId,
 			int numSamples,
 			Time delayBetweenSamples,
 			int maxStackTraceDepth,
 			Time timeout) {
-//		return taskExecutorGateway.requestStackTraceSample(
-//			executionAttemptID,
-//			sampleId,
-//			numSamples,
-//			delayBetweenSamples,
-//			maxStackTraceDepth,
-//			timeout);
 
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}
 
 	@Override
-	public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
 		return taskExecutorGateway.submitTask(tdd, leaderId, timeout);
 	}
 
 	@Override
-	public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+	public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		return taskExecutorGateway.stopTask(executionAttemptID, timeout);
 	}
 
 	@Override
-	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
 		return taskExecutorGateway.cancelTask(executionAttemptID, timeout);
 	}
 
 	@Override
-	public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+	public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
 		return taskExecutorGateway.updatePartitions(executionAttemptID, partitionInfos, timeout);
 	}
 
@@ -130,13 +123,13 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+	public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
 //		return taskExecutorGateway.requestTaskManagerLog(timeout);
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+	public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
 //		return taskExecutorGateway.requestTaskManagerStdout(timeout);
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 1034a89..6a18ffd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -176,8 +175,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 	public void startRegistration() {
 		try {
 			// trigger resolution of the resource manager address to a callable gateway
-			CompletableFuture<Gateway> resourceManagerFuture = FutureUtils.toJava(
-				rpcService.connect(targetAddress, targetType));
+			CompletableFuture<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
 
 			// upon success, start the registration attempts
 			CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 8bffcd0..aaa72d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -20,10 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -37,7 +34,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -180,7 +178,7 @@ public class JobLeaderIdService {
 		return jobLeaderIdListeners.containsKey(jobId);
 	}
 
-	public Future<UUID> getLeaderId(JobID jobId) throws Exception {
+	public CompletableFuture<UUID> getLeaderId(JobID jobId) throws Exception {
 		if (!jobLeaderIdListeners.containsKey(jobId)) {
 			addJob(jobId);
 		}
@@ -235,7 +233,7 @@ public class JobLeaderIdService {
 			this.listenerJobLeaderIdActions = Preconditions.checkNotNull(listenerJobLeaderIdActions);
 			this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
 
-			leaderIdFuture = new FlinkCompletableFuture<>();
+			leaderIdFuture = new CompletableFuture<>();
 
 			activateTimeout();
 
@@ -243,7 +241,7 @@ public class JobLeaderIdService {
 			leaderRetrievalService.start(this);
 		}
 
-		public Future<UUID> getLeaderIdFuture() {
+		public CompletableFuture<UUID> getLeaderIdFuture() {
 			return leaderIdFuture;
 		}
 
@@ -269,12 +267,12 @@ public class JobLeaderIdService {
 				if (leaderIdFuture.isDone()) {
 					try {
 						previousJobLeaderId = leaderIdFuture.getNow(null);
-					} catch (ExecutionException e) {
+					} catch (CompletionException e) {
 						// this should never happen since we complete this future always properly
 						handleError(e);
 					}
 
-					leaderIdFuture = FlinkCompletableFuture.completed(leaderSessionId);
+					leaderIdFuture = CompletableFuture.completedFuture(leaderSessionId);
 				} else {
 					leaderIdFuture.complete(leaderSessionId);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5e33c0e..8318c09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
@@ -246,7 +245,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	// ------------------------------------------------------------------------
 
 	@RpcMethod
-	public Future<RegistrationResponse> registerJobManager(
+	public CompletableFuture<RegistrationResponse> registerJobManager(
 			final UUID resourceManagerLeaderId,
 			final UUID jobManagerLeaderId,
 			final ResourceID jobManagerResourceId,
@@ -270,13 +269,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 					onFatalErrorAsync(exception);
 
 					log.error("Could not add job {} to job leader id service.", jobId, e);
-					return FutureUtils.toFlinkFuture(FutureUtils.completedExceptionally(exception));
+					return FutureUtils.completedExceptionally(exception);
 				}
 			}
 
 			log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId);
 
-			Future<UUID> jobLeaderIdFuture;
+			CompletableFuture<UUID> jobLeaderIdFuture;
 
 			try {
 				jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
@@ -289,12 +288,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				onFatalErrorAsync(exception);
 
 				log.debug("Could not obtain the job leader id future to verify the correct job leader.");
-				return FutureUtils.toFlinkFuture(FutureUtils.completedExceptionally(exception));
+				return FutureUtils.completedExceptionally(exception);
 			}
 
-			Future<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
+			CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
 
-			Future<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
+			CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
 				jobLeaderIdFuture,
 				(JobMasterGateway jobMasterGateway, UUID jobLeaderId) -> {
 					if (isValid(resourceManagerLeaderId)) {
@@ -339,8 +338,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				"{} did not match the expected leader id {}.", jobManagerAddress,
 				resourceManagerLeaderId, leaderSessionId);
 
-			return FutureUtils.toFlinkFuture(CompletableFuture.<RegistrationResponse>completedFuture(
-				new RegistrationResponse.Decline("Resource manager leader id did not match.")));
+			return CompletableFuture.completedFuture(
+				new RegistrationResponse.Decline("Resource manager leader id did not match."));
 		}
 	}
 
@@ -354,14 +353,14 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * @return The response by the ResourceManager.
 	 */
 	@RpcMethod
-	public Future<RegistrationResponse> registerTaskExecutor(
+	public CompletableFuture<RegistrationResponse> registerTaskExecutor(
 			final UUID resourceManagerLeaderId,
 			final String taskExecutorAddress,
 			final ResourceID taskExecutorResourceId,
 			final SlotReport slotReport) {
 
 		if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
-			Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+			CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
 			return taskExecutorGatewayFuture.handleAsync(
 				(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
@@ -381,10 +380,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 					"not equal the received leader session ID  {}",
 				taskExecutorResourceId, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);
 
-			return FutureUtils.toFlinkFuture(CompletableFuture.<RegistrationResponse>completedFuture(
+			return CompletableFuture.completedFuture(
 				new RegistrationResponse.Decline("Discard registration because the leader id " +
 					resourceManagerLeaderId + " does not match the expected leader id " +
-					leaderSessionId + '.')));
+					leaderSessionId + '.'));
 		}
 	}
 
@@ -493,8 +492,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		if(infoMessageListeners.containsKey(address)) {
 			log.warn("Receive a duplicate registration from info message listener on ({})", address);
 		} else {
-			CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = FutureUtils.toJava(
-				getRpcService().connect(address, InfoMessageListenerRpcGateway.class));
+			CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService()
+				.connect(address, InfoMessageListenerRpcGateway.class);
 
 			infoMessageListenerRpcGatewayFuture.whenCompleteAsync(
 				(InfoMessageListenerRpcGateway gateway, Throwable failure) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index eb091c4..1ba6893 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * The {@link ResourceManager}'s RPC gateway interface.
@@ -51,7 +51,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param timeout                 Timeout for the future to complete
 	 * @return Future registration response
 	 */
-	Future<RegistrationResponse> registerJobManager(
+	CompletableFuture<RegistrationResponse> registerJobManager(
 		UUID resourceManagerLeaderId,
 		UUID jobMasterLeaderId,
 		ResourceID jobMasterResourceId,
@@ -67,7 +67,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param slotRequest The slot to request
 	 * @return The confirmation that the slot gets allocated
 	 */
-	Future<Acknowledge> requestSlot(
+	CompletableFuture<Acknowledge> requestSlot(
 		UUID resourceManagerLeaderID,
 		UUID jobMasterLeaderID,
 		SlotRequest slotRequest,
@@ -84,7 +84,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 *
 	 * @return The future to the response by the ResourceManager.
 	 */
-	Future<RegistrationResponse> registerTaskExecutor(
+	CompletableFuture<RegistrationResponse> registerTaskExecutor(
 		UUID resourceManagerLeaderId,
 		String taskExecutorAddress,
 		ResourceID resourceID,
@@ -133,7 +133,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param leaderSessionId The leader session ID with which to address the ResourceManager.
 	 * @return The future to the number of registered TaskManagers.
 	 */
-	Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
+	CompletableFuture<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
 
 	/**
 	 * Sends the heartbeat to resource manager from task manager

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 8354525..3bda409 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -639,14 +638,13 @@ public class SlotManager implements AutoCloseable {
 		}
 
 		// RPC call to the task manager
-		CompletableFuture<Acknowledge> requestFuture = FutureUtils.toJava(
-			gateway.requestSlot(
-				slotId,
-				pendingSlotRequest.getJobId(),
-				allocationId,
-				pendingSlotRequest.getTargetAddress(),
-				leaderId,
-				taskManagerRequestTimeout));
+		CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
+			slotId,
+			pendingSlotRequest.getJobId(),
+			allocationId,
+			pendingSlotRequest.getTargetAddress(),
+			leaderId,
+			taskManagerRequestTimeout);
 
 		requestFuture.whenComplete(
 			(Acknowledge acknowledge, Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
index ec1c984..6e36bd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 
 /**
@@ -51,7 +51,7 @@ public interface MainThreadExecutable {
 	 * @param <V> Return value of the callable
 	 * @return Future of the callable result
 	 */
-	<V> Future<V> callAsync(Callable<V> callable, Time callTimeout);
+	<V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout);
 
 	/**
 	 * Execute the runnable in the main thread of the underlying RPC endpoint, with

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 331f3a3..b5bbc2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
@@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -214,7 +214,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 *
 	 * @return Future which is completed when the rpc endpoint has been terminated.
 	 */
-	public Future<Void> getTerminationFuture() {
+	public CompletableFuture<Void> getTerminationFuture() {
 		return ((SelfGateway)self).getTerminationFuture();
 	}
 
@@ -264,7 +264,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @param <V> Return type of the callable
 	 * @return Future for the result of the callable.
 	 */
-	protected <V> Future<V> callAsync(Callable<V> callable, Time timeout) {
+	protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout) {
 		return ((MainThreadExecutable) self).callAsync(callable, timeout);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 51b7ca2..a92f3e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -61,7 +61,7 @@ public interface RpcService {
 	 * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the
 	 * connection attempt failed
 	 */
-	<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
+	<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
 
 	/**
 	 * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
@@ -91,7 +91,7 @@ public interface RpcService {
 	 *
 	 * @return Termination future
 	 */
-	Future<Void> getTerminationFuture();
+	CompletableFuture<Void> getTerminationFuture();
 
 	/**
 	 * Gets the executor, provided by this RPC service. This executor can be used for example for
@@ -145,7 +145,7 @@ public interface RpcService {
 	void execute(Runnable runnable);
 
 	/**
-	 * Execute the given callable and return its result as a {@link Future}. This method can be used
+	 * Execute the given callable and return its result as a {@link CompletableFuture}. This method can be used
 	 * to run code outside of the main thread of a {@link RpcEndpoint}.
 	 *
 	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
@@ -158,5 +158,5 @@ public interface RpcService {
 	 * @param <T> is the return value type
 	 * @return Future containing the callable's future result
 	 */
-	<T> Future<T> execute(Callable<T> callable);
+	<T> CompletableFuture<T> execute(Callable<T> callable);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
index ed8ef9d..d39b1ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.runtime.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Interface for self gateways
@@ -30,5 +30,5 @@ public interface SelfGateway {
 	 *
 	 * @return Future indicating when the rpc endpoint has been terminated
 	 */
-	Future<Void> getTerminationFuture();
+	CompletableFuture<Void> getTerminationFuture();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index c21accf..ae6b832 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -22,8 +22,7 @@ import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.SelfGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -44,7 +43,9 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.util.BitSet;
+import java.util.Objects;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -79,7 +80,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 	private final long maximumFramesize;
 
 	// null if gateway; otherwise non-null
-	private final Future<Void> terminationFuture;
+	private final CompletableFuture<Void> terminationFuture;
 
 	AkkaInvocationHandler(
 			String address,
@@ -87,7 +88,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 			ActorRef rpcEndpoint,
 			Time timeout,
 			long maximumFramesize,
-			Future<Void> terminationFuture) {
+			CompletableFuture<Void> terminationFuture) {
 
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
@@ -146,20 +147,19 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 
 			Class<?> returnType = method.getReturnType();
 
-			if (returnType.equals(Void.TYPE)) {
+			if (Objects.equals(returnType, Void.TYPE)) {
 				rpcEndpoint.tell(rpcInvocation, ActorRef.noSender());
 
 				result = null;
-			} else if (returnType.equals(Future.class)) {
+			} else if (Objects.equals(returnType,CompletableFuture.class)) {
 				// execute an asynchronous call
-				result = new FlinkFuture<>(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
+				result = FutureUtils.toJava(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
 			} else {
 				// execute a synchronous call
-				scala.concurrent.Future<?> scalaFuture = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds());
+				CompletableFuture<?> futureResult = FutureUtils.toJava(
+					Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
 
-				Future<?> futureResult = new FlinkFuture<>(scalaFuture);
-
-				return futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
+				result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
 			}
 		}
 
@@ -191,12 +191,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 	}
 
 	@Override
-	public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
+	public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
 		if(isLocal) {
 			@SuppressWarnings("unchecked")
-			scala.concurrent.Future<V> result = (scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds());
+			scala.concurrent.Future<V> resultFuture = (scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds());
 
-			return new FlinkFuture<>(result);
+			return FutureUtils.toJava(resultFuture);
 		} else {
 			throw new RuntimeException("Trying to send a Callable to a remote actor at " +
 				rpcEndpoint.path() + ". This is not supported.");
@@ -331,7 +331,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
 	}
 
 	@Override
-	public Future<Void> getTerminationFuture() {
+	public CompletableFuture<Void> getTerminationFuture() {
 		return terminationFuture;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index fe3fcc9..5845473 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -21,12 +21,8 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActor;
-import akka.dispatch.Futures;
 import akka.japi.Procedure;
 import akka.pattern.Patterns;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -44,11 +40,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -209,24 +207,20 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
 						return;
 					}
 
-					if (result instanceof Future) {
-						final Future<?> future = (Future<?>) result;
-
-						// pipe result to sender
-						if (future instanceof FlinkFuture) {
-							// FlinkFutures are currently backed by Scala's futures
-							FlinkFuture<?> flinkFuture = (FlinkFuture<?>) future;
-
-							Patterns.pipe(flinkFuture.getScalaFuture(), getContext().dispatcher()).to(getSender());
-						} else {
-							// We have to unpack the Flink future and pack it into a Scala future
-							Patterns.pipe(Futures.future(new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									return future.get();
+					if (result instanceof CompletableFuture) {
+						final CompletableFuture<?> future = (CompletableFuture<?>) result;
+						Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();
+
+						future.whenComplete(
+							(value, throwable) -> {
+								if (throwable != null) {
+									promise.failure(throwable);
+								} else {
+									promise.success(value);
 								}
-							}, getContext().dispatcher()), getContext().dispatcher());
-						}
+							});
+
+						Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
 					} else {
 						// tell the sender the result of the computation
 						getSender().tell(new Status.Success(result), getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 2f02e8c..80267f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -31,11 +31,8 @@ import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -57,6 +54,7 @@ import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
@@ -133,7 +131,7 @@ public class AkkaRpcService implements RpcService {
 
 	// this method does not mutate state and is thus thread-safe
 	@Override
-	public <C extends RpcGateway> Future<C> connect(final String address, final Class<C> clazz) {
+	public <C extends RpcGateway> CompletableFuture<C> connect(final String address, final Class<C> clazz) {
 		checkState(!stopped, "RpcService is stopped");
 
 		LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
@@ -186,14 +184,14 @@ public class AkkaRpcService implements RpcService {
 			}
 		}, actorSystem.dispatcher());
 
-		return new FlinkFuture<>(resultFuture);
+		return FutureUtils.toJava(resultFuture);
 	}
 
 	@Override
 	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
 		checkNotNull(rpcEndpoint, "rpc endpoint");
 
-		CompletableFuture<Void> terminationFuture = new FlinkCompletableFuture<>();
+		CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
 		Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
 		ActorRef actorRef;
 
@@ -283,14 +281,10 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
-	public Future<Void> getTerminationFuture() {
-		return FlinkFuture.supplyAsync(new Callable<Void>(){
-			@Override
-			public Void call() throws Exception {
-				actorSystem.awaitTermination();
-				return null;
-			}
-		}, getExecutor());
+	public CompletableFuture<Void> getTerminationFuture() {
+		return CompletableFuture.runAsync(
+			actorSystem::awaitTermination,
+			getExecutor());
 	}
 
 	@Override
@@ -317,10 +311,10 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
-	public <T> Future<T> execute(Callable<T> callable) {
+	public <T> CompletableFuture<T> execute(Callable<T> callable) {
 		scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher());
 
-		return new FlinkFuture<>(scalaFuture);
+		return FutureUtils.toJava(scalaFuture);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 71933fe..2ebf3c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -379,8 +378,8 @@ public class JobLeaderService {
 		protected CompletableFuture<RegistrationResponse> invokeRegistration(
 				JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
 		{
-			return FutureUtils.toJava(gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
-					leaderId, Time.milliseconds(timeoutMillis)));
+			return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
+					leaderId, Time.milliseconds(timeoutMillis));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index aa4d6d2..effa498 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -768,12 +767,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 					reservedSlots.add(offer);
 				}
 
-				CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = FutureUtils.toJava(
-					jobMasterGateway.offerSlots(
-						getResourceID(),
-						reservedSlots,
-						leaderId,
-						taskManagerConfiguration.getTimeout()));
+				CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+					getResourceID(),
+					reservedSlots,
+					leaderId,
+					taskManagerConfiguration.getTimeout());
 
 				acceptedSlotsFuture.whenCompleteAsync(
 					(Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
@@ -985,8 +983,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	{
 		final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
 
-		CompletableFuture<Acknowledge> futureAcknowledge = FutureUtils.toJava(
-			jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState));
+		CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState);
 
 		futureAcknowledge.whenCompleteAsync(
 			(ack, throwable) -> {
@@ -1348,10 +1345,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		@Override
 		public CompletableFuture<SlotReport> retrievePayload() {
-			return FutureUtils.toJava(
-				callAsync(
+			return callAsync(
 					() -> taskSlotTable.createSlotReport(getResourceID()),
-					taskManagerConfiguration.getTimeout()));
+					taskManagerConfiguration.getTimeout());
 		}
 	}
 }


[2/3] flink git commit: [FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d4afdbd..8084154 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link TaskExecutor} RPC gateway interface
@@ -48,7 +48,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param resourceManagerLeaderId current leader id of the ResourceManager
 	 * @return answer to the slot request
 	 */
-	Future<Acknowledge> requestSlot(
+	CompletableFuture<Acknowledge> requestSlot(
 		SlotID slotId,
 		JobID jobId,
 		AllocationID allocationId,
@@ -64,7 +64,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout of the submit operation
 	 * @return Future acknowledge of the successful operation
 	 */
-	Future<Acknowledge> submitTask(
+	CompletableFuture<Acknowledge> submitTask(
 		TaskDeploymentDescriptor tdd,
 		UUID leaderId,
 		@RpcTimeout Time timeout);
@@ -77,7 +77,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout for the update partitions operation
 	 * @return Future acknowledge if the partitions have been successfully updated
 	 */
-	Future<Acknowledge> updatePartitions(
+	CompletableFuture<Acknowledge> updatePartitions(
 		ExecutionAttemptID executionAttemptID,
 		Iterable<PartitionInfo> partitionInfos,
 		@RpcTimeout Time timeout);
@@ -99,7 +99,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param checkpointOptions for performing the checkpoint
 	 * @return Future acknowledge if the checkpoint has been successfully triggered
 	 */
-	Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
+	CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
 
 	/**
 	 * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
@@ -110,7 +110,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
 	 * @return Future acknowledge if the checkpoint has been successfully confirmed
 	 */
-	Future<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
+	CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
 
 	/**
 	 * Stop the given task.
@@ -119,7 +119,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout for the stop operation
 	 * @return Future acknowledge if the task is successfully stopped
 	 */
-	Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+	CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 
 	/**
 	 * Cancel the given task.
@@ -128,7 +128,7 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param timeout for the cancel operation
 	 * @return Future acknowledge if the task is successfully canceled
 	 */
-	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+	CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
 
 	/**
 	 * Heartbeat request from the job manager

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 4f91166..4084d67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -156,7 +155,7 @@ public class TaskExecutorToResourceManagerConnection
 				ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
 
 			Time timeout = Time.milliseconds(timeoutMillis);
-			return FutureUtils.toJava(resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout));
+			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 78b49ef..b077b76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -52,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -162,7 +162,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	}
 
 	// export the termination future for caller to know it is terminated
-	public Future<Void> getTerminationFuture() {
+	public CompletableFuture<Void> getTerminationFuture() {
 		return taskManager.getTerminationFuture();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 3b9da48..a919c78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -32,6 +31,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 public class RpcInputSplitProvider implements InputSplitProvider {
 	private final UUID jobMasterLeaderId;
@@ -61,7 +61,7 @@ public class RpcInputSplitProvider implements InputSplitProvider {
 	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
 		Preconditions.checkNotNull(userCodeClassLoader);
 
-		Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
+		CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
 				jobMasterLeaderId, jobVertexID, executionAttemptID);
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
index 07d04e6..26e1b0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskexecutor.rpc;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -46,6 +45,6 @@ public class RpcPartitionStateChecker implements PartitionProducerStateChecker {
 			IntermediateDataSetID resultId,
 			ResultPartitionID partitionId) {
 
-		return FutureUtils.toJava(jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId));
+		return jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
index cf01d5a..d898562 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -32,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 public class RpcResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
@@ -55,18 +54,17 @@ public class RpcResultPartitionConsumableNotifier implements ResultPartitionCons
 	}
 	@Override
 	public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
-		Future<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
+		CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
 				jobMasterLeaderId, partitionId, timeout);
 
-		acknowledgeFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-			@Override
-			public Void apply(Throwable value) {
-				LOG.error("Could not schedule or update consumers at the JobManager.", value);
+		acknowledgeFuture.whenCompleteAsync(
+			(Acknowledge ack, Throwable throwable) -> {
+				if (throwable != null) {
+					LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
 
-				taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", value));
-
-				return null;
-			}
-		}, executor);
+					taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a6712ad..ef4fa86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
 import java.net._
 import java.util.UUID
 import java.util.concurrent.{Future => JavaFuture, _}
-import java.util.function.BiFunction
+import java.util.function.{BiFunction, Consumer}
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -1105,17 +1105,18 @@ class JobManager(
 
       val originalSender = new AkkaActorGateway(sender(), leaderSessionID.orNull)
 
-      val sendingFuture = stackTraceFuture.thenAccept(new AcceptFunction[StackTrace] {
-        override def accept(value: StackTrace): Unit = {
-          originalSender.tell(value)
-        }
-      })
+      val sendingFuture = stackTraceFuture.thenAccept(
+        new Consumer[StackTrace]() {
+          override def accept(value: StackTrace): Unit = {
+            originalSender.tell(value)
+          }
+        })
 
-      sendingFuture.exceptionally(new ApplyFunction[Throwable, Void] {
+      sendingFuture.exceptionally(new java.util.function.Function[Throwable, Void] {
         override def apply(value: Throwable): Void = {
           log.info("Could not send requested stack trace.", value)
 
-          return null
+          null
         }
       })
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 09e829e..37a4547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.akka;
 
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -43,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -125,7 +123,7 @@ public class QuarantineMonitorTest extends TestLogger {
 			// start watching the watchee
 			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
 
-			Future<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
+			CompletableFuture<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
 
 			Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
 		} finally {
@@ -166,7 +164,7 @@ public class QuarantineMonitorTest extends TestLogger {
 			// start watching the watchee
 			watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
 
-			Future<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
+			CompletableFuture<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
 
 			Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
 		} finally {
@@ -182,8 +180,8 @@ public class QuarantineMonitorTest extends TestLogger {
 		private final CompletableFuture<String> hasQuarantinedFuture;
 
 		public TestingQuarantineHandler() {
-			this.wasQuarantinedByFuture = new FlinkCompletableFuture<>();
-			this.hasQuarantinedFuture = new FlinkCompletableFuture<>();
+			this.wasQuarantinedByFuture = new CompletableFuture<>();
+			this.hasQuarantinedFuture = new CompletableFuture<>();
 		}
 
 		@Override
@@ -196,11 +194,11 @@ public class QuarantineMonitorTest extends TestLogger {
 			hasQuarantinedFuture.complete(remoteSystem);
 		}
 
-		public Future<String> getWasQuarantinedByFuture() {
+		public CompletableFuture<String> getWasQuarantinedByFuture() {
 			return wasQuarantinedByFuture;
 		}
 
-		public Future<String> getHasQuarantinedFuture() {
+		public CompletableFuture<String> getHasQuarantinedFuture() {
 			return hasQuarantinedFuture;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 5df5c58..e23f6a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -339,10 +338,10 @@ public class CheckpointCoordinatorMasterHooksTest {
 		final MasterTriggerRestoreHook<Void> hook = mockGeneric(MasterTriggerRestoreHook.class);
 		when(hook.getIdentifier()).thenReturn(id);
 		when(hook.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class))).thenAnswer(
-				new Answer<Future<Void>>() {
+				new Answer<CompletableFuture<Void>>() {
 
 					@Override
-					public Future<Void> answer(InvocationOnMock invocation) throws Throwable {
+					public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
 						assertEquals(1, cc.getNumberOfPendingCheckpoints());
 
 						long checkpointId = (Long) invocation.getArguments()[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index f1bc43b..e1144c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
 import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
 import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -71,6 +70,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -524,7 +524,7 @@ public class ResourceManagerTest extends TestLogger {
 
 			final SlotReport slotReport = new SlotReport();
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
+			CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
 				rmLeaderSessionId,
 				taskManagerAddress,
 				taskManagerResourceID,
@@ -622,7 +622,7 @@ public class ResourceManagerTest extends TestLogger {
 			rmLeaderElectionService.isLeader(rmLeaderId);
 
 			// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
-			Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+			CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
 				rmLeaderId,
 				jmLeaderId,
 				jmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a7a86c3..267f10b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
@@ -42,6 +41,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -86,7 +87,7 @@ public class DispatcherTest extends TestLogger {
 
 			DispatcherGateway dispatcherGateway = dispatcher.getSelf();
 
-			Future<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+			CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
 
 			acknowledgeFuture.get();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index bfcab87..f4e8b30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -120,7 +119,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			when(rootSlot.getSlotNumber()).thenReturn(0);
 
-			when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 			TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index d3086a8..b88a928 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -576,7 +575,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 	private static TaskManagerGateway createTaskManager() {
 		TaskManagerGateway tm = mock(TaskManagerGateway.class);
 		when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		return tm;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index effe417..de9081b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -37,6 +36,8 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
+
 import static org.junit.Assert.assertEquals;
 
 import static org.junit.Assert.fail;
@@ -155,9 +156,9 @@ public class ExecutionGraphStopTest extends TestLogger {
 
 		final TaskManagerGateway gateway = mock(TaskManagerGateway.class);
 		when(gateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 		when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
-				.thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
index f453d20..be68532 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -19,14 +19,15 @@
 package org.apache.flink.runtime.executiongraph.utils;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 
+import java.util.concurrent.CompletableFuture;
+
 public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
 
 	@Override
-	public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		return new FlinkCompletableFuture<>();
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return new CompletableFuture<>();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 6e67c1a..b968d39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A TaskManagerGateway that simply acks the basic operations (deploy, cancel, update) and does not
@@ -56,39 +56,39 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 	public void stopCluster(ApplicationStatus applicationStatus, String message) {}
 
 	@Override
-	public Future<StackTrace> requestStackTrace(Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
 	@Override
-	public Future<StackTraceSampleResponse> requestStackTraceSample(
+	public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
 			ExecutionAttemptID executionAttemptID,
 			int sampleId,
 			int numSamples,
 			Time delayBetweenSamples,
 			int maxStackTraceDepth,
 			Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
 	@Override
-	public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
-	public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
-	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
-	public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
-		return FlinkCompletableFuture.completed(Acknowledge.get());
+	public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 
 	@Override
@@ -110,12 +110,12 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 			CheckpointOptions checkpointOptions) {}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerLog(Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 
 	@Override
-	public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
-		return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+	public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index b444640..4cc4f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -35,6 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -82,7 +82,7 @@ public class SlotPoolRpcTest {
 		);
 		pool.start(UUID.randomUUID(), "foobar");
 
-		Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
 
 		try {
 			future.get(4, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index cf95461..3e2293b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -41,6 +40,7 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
@@ -85,7 +85,7 @@ public class SlotPoolTest extends TestLogger {
 		this.resourceManagerGateway = mock(ResourceManagerGateway.class);
 		when(resourceManagerGateway
 			.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
-			.thenReturn(mock(Future.class, RETURNS_MOCKS));
+			.thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
 		slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway);
 	}
@@ -101,7 +101,7 @@ public class SlotPoolTest extends TestLogger {
 		slotPool.registerTaskManager(resourceID);
 
 		ScheduledUnit task = mock(ScheduledUnit.class);
-		Future<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
 		assertFalse(future.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -126,8 +126,8 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
-		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		assertFalse(future1.isDone());
 		assertFalse(future2.isDone());
@@ -165,7 +165,7 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 		assertFalse(future1.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -182,7 +182,7 @@ public class SlotPoolTest extends TestLogger {
 		// return this slot to pool
 		slot1.releaseSlot();
 
-		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		// second allocation fulfilled by previous slot returning
 		SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -200,7 +200,7 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 		assertFalse(future.isDone());
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -241,14 +241,14 @@ public class SlotPoolTest extends TestLogger {
 		ResourceID resourceID = new ResourceID("resource");
 		slotPool.registerTaskManager(resourceID);
 
-		Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
 		verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
 
 		final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
 
-		Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+		CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
 
 		AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
 		assertTrue(slotPool.offerSlot(allocatedSlot));

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 0b25e6c..48a1d45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -34,7 +33,6 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -50,6 +48,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.net.InetAddress;
 import java.net.URL;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -177,7 +176,7 @@ public class JobMasterTest extends TestLogger {
 			anyString(),
 			any(JobID.class),
 			any(Time.class)
-		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JobMasterRegistrationSuccess(
+		)).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess(
 			heartbeatInterval, rmLeaderId, rmResourceId)));
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 9a4917a..da992bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
@@ -123,8 +122,8 @@ public class RetryingRegistrationTest extends TestLogger {
 			// RPC service that fails upon the first connection, but succeeds on the second
 			RpcService rpc = mock(RpcService.class);
 			when(rpc.connect(anyString(), any(Class.class))).thenReturn(
-					FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")),  // first connection attempt fails
-					FlinkCompletableFuture.completed(testGateway)                         // second connection attempt succeeds
+					FutureUtils.completedExceptionally(new Exception("test connect failure")),  // first connection attempt fails
+					CompletableFuture.completedFuture(testGateway)                         // second connection attempt succeeds
 			);
 			when(rpc.getExecutor()).thenReturn(executor);
 
@@ -245,8 +244,8 @@ public class RetryingRegistrationTest extends TestLogger {
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
 
 			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
-					FlinkCompletableFuture.<RegistrationResponse>completedExceptionally(new Exception("test exception")),
-					FlinkCompletableFuture.<RegistrationResponse>completed(new TestRegistrationSuccess(testId)));
+					FutureUtils.completedExceptionally(new Exception("test exception")),
+					CompletableFuture.completedFuture(new TestRegistrationSuccess(testId)));
 
 			rpc.registerGateway(testEndpointAddress, testGateway);
 
@@ -281,7 +280,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		TestingRpcService rpc = new TestingRpcService();
 
 		try {
-			FlinkCompletableFuture<RegistrationResponse> result = new FlinkCompletableFuture<>();
+			CompletableFuture<RegistrationResponse> result = new CompletableFuture<>();
 
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
 			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result);
@@ -340,7 +339,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		@Override
 		protected CompletableFuture<RegistrationResponse> invokeRegistration(
 				TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
-			return FutureUtils.toJava(gateway.registrationCall(leaderId, timeoutMillis));
+			return gateway.registrationCall(leaderId, timeoutMillis);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 1b23fa3..4cfbc12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.registration;
 
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.TestingGatewayBase;
 import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -47,7 +46,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
 
 	// ------------------------------------------------------------------------
 
-	public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
+	public CompletableFuture<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
 		invocations.add(new RegistrationCall(leaderId, timeout));
 
 		RegistrationResponse response = responses[pos];
@@ -56,7 +55,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
 		}
 
 		// return a completed future (for a proper value), or one that never completes and will time out (for null)
-		return response != null ? FlinkCompletableFuture.completed(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+		return response != null ? CompletableFuture.completedFuture(response) : futureWithTimeout(timeout);
 	}
 
 	public BlockingQueue<RegistrationCall> getInvocations() {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index 4d5964e..7b8703e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -34,6 +33,7 @@ import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -83,7 +83,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
 		jobLeaderIdService.addJob(jobId);
 
-		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+		CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
 
 		// notify the leader id service about the new leader
 		leaderRetrievalService.notifyListener(address, leaderId);
@@ -117,7 +117,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
 		jobLeaderIdService.addJob(jobId);
 
-		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+		CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
 
 		// remove the job before we could find a leader
 		jobLeaderIdService.removeJob(jobId);
@@ -228,7 +228,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
 		jobLeaderIdService.addJob(jobId);
 
-		Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+		CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
 
 		// notify the leader id service about the new leader
 		leaderRetrievalService.notifyListener(address, leaderId);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 4836f74..6480d75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -43,6 +42,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertTrue;
@@ -74,11 +74,11 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test response successful
-		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderID,
 			jmResourceId,
@@ -104,12 +104,12 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
 			differentLeaderSessionID,
 			jmLeaderID,
 			jmResourceId,
@@ -134,14 +134,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			"localhost",
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			differentLeaderSessionID,
 			jmResourceId,
@@ -166,14 +166,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			"localhost",
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes invalid address
 		String invalidAddress = "/jobMasterAddress2";
-		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderSessionId,
 			jmResourceId,
@@ -198,14 +198,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			"localhost",
 			HighAvailabilityServices.DEFAULT_LEADER_ID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		JobID unknownJobIDToHAServices = new JobID();
 		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-		Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
+		CompletableFuture<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
 			rmLeaderSessionId,
 			jmLeaderSessionId,
 			jmResourceId,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 85b7eb4..4127cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -41,6 +40,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertNotEquals;
@@ -89,13 +89,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	public void testRegisterTaskExecutor() throws Exception {
 		try {
 			// test response successful
-			Future<RegistrationResponse> successfulFuture =
+			CompletableFuture<RegistrationResponse> successfulFuture =
 				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
 			RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
-			Future<RegistrationResponse> duplicateFuture =
+			CompletableFuture<RegistrationResponse> duplicateFuture =
 				resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
@@ -115,7 +115,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
 			UUID differentLeaderSessionID = UUID.randomUUID();
-			Future<RegistrationResponse> unMatchedLeaderFuture =
+			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
 				resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
 			assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
@@ -133,7 +133,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
-			Future<RegistrationResponse> invalidAddressFuture =
+			CompletableFuture<RegistrationResponse> invalidAddressFuture =
 				resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
 			assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 		} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 39c5f25..93e96a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
-import org.apache.flink.runtime.concurrent.*;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -44,7 +44,7 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -114,7 +114,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+			any(Time.class))).thenReturn(new CompletableFuture<>());
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -241,7 +241,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(allocationId),
 				anyString(),
 				eq(leaderId),
-				any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+				any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -280,7 +280,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+			any(Time.class))).thenReturn(new CompletableFuture<>());
 
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
@@ -338,7 +338,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(allocationId),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -482,7 +482,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -527,7 +527,7 @@ public class SlotManagerTest extends TestLogger {
 			any(AllocationID.class),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -747,8 +747,8 @@ public class SlotManagerTest extends TestLogger {
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
-		final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
-		final FlinkCompletableFuture<Acknowledge> slotRequestFuture2 = new FlinkCompletableFuture<>();
+		final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
+		final CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<>();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		when(taskExecutorGateway.requestSlot(
@@ -826,7 +826,7 @@ public class SlotManagerTest extends TestLogger {
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
-		final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
+		final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		when(taskExecutorGateway.requestSlot(
@@ -835,7 +835,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(allocationId),
 			anyString(),
 			any(UUID.class),
-			any(Time.class))).thenReturn(slotRequestFuture1, FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -856,24 +856,21 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			Future<Void> registrationFuture = FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
+			CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
+				() -> {
 					slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 					return null;
-				}
-			}, mainThreadExecutor)
-			.thenAccept(new AcceptFunction<Void>() {
-				@Override
-				public void accept(Void value) {
+				},
+				mainThreadExecutor)
+			.thenAccept(
+				(Object value) -> {
 					try {
 						slotManager.registerSlotRequest(slotRequest);
 					} catch (SlotManagerException e) {
 						throw new RuntimeException("Could not register slots.", e);
 					}
-				}
-			});
+				});
 
 			// check that no exception has been thrown
 			registrationFuture.get();
@@ -891,12 +888,9 @@ public class SlotManagerTest extends TestLogger {
 			final SlotID requestedSlotId = slotIdCaptor.getValue();
 			final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
 
-			Future<Boolean> freeSlotFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
-				@Override
-				public Boolean call() throws Exception {
-					return slotManager.getSlot(freeSlotId).isFree();
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<Boolean> freeSlotFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.getSlot(freeSlotId).isFree(),
+				mainThreadExecutor);
 
 			assertTrue(freeSlotFuture.get());
 
@@ -904,15 +898,10 @@ public class SlotManagerTest extends TestLogger {
 			final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
 			final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
 
-			FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					// this should update the slot with the pending slot request triggering the reassignment of it
-					slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
-
-					return null;
-				}
-			}, mainThreadExecutor);
+			CompletableFuture.supplyAsync(
+				// this should update the slot with the pending slot request triggering the reassignment of it
+				() -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport),
+				mainThreadExecutor);
 
 			verify(taskExecutorGateway, timeout(verifyTimeout).times(2)).requestSlot(
 				slotIdCaptor.capture(),
@@ -926,12 +915,9 @@ public class SlotManagerTest extends TestLogger {
 
 			assertEquals(slotId2, requestedSlotId2);
 
-			Future<TaskManagerSlot> requestedSlotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
-				@Override
-				public TaskManagerSlot call() throws Exception {
-					return slotManager.getSlot(requestedSlotId2);
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<TaskManagerSlot> requestedSlotFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.getSlot(requestedSlotId2),
+				mainThreadExecutor);
 
 			TaskManagerSlot slot = requestedSlotFuture.get();
 
@@ -967,7 +953,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(allocationId),
 			anyString(),
 			eq(leaderId),
-			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
@@ -987,20 +973,16 @@ public class SlotManagerTest extends TestLogger {
 
 			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-			FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					slotManager.registerSlotRequest(slotRequest);
-
-					return null;
-				}
-			}, mainThreadExecutor)
-			.thenAccept(new AcceptFunction<Void>() {
-				@Override
-				public void accept(Void value) {
-					slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
-				}
-			});
+			CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						return slotManager.registerSlotRequest(slotRequest);
+					} catch (SlotManagerException e) {
+						throw new FlinkFutureException(e);
+					}
+				},
+				mainThreadExecutor)
+			.thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
 
 			ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
 
@@ -1012,44 +994,28 @@ public class SlotManagerTest extends TestLogger {
 				eq(leaderId),
 				any(Time.class));
 
-			Future<Boolean> idleFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
-				@Override
-				public Boolean call() throws Exception {
-					return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
+				mainThreadExecutor);
 
 			// check that the TaskManaer is not idle
 			assertFalse(idleFuture.get());
 
 			final SlotID slotId = slotIdArgumentCaptor.getValue();
 
-			Future<TaskManagerSlot> slotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
-				@Override
-				public TaskManagerSlot call() throws Exception {
-					return slotManager.getSlot(slotId);
-				}
-			}, mainThreadExecutor);
+			CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
+				() -> slotManager.getSlot(slotId),
+				mainThreadExecutor);
 
 			TaskManagerSlot slot = slotFuture.get();
 
 			assertTrue(slot.isAllocated());
 			assertEquals(allocationId, slot.getAllocationId());
 
-			Future<Boolean> idleFuture2 = FlinkFuture.supplyAsync(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					slotManager.freeSlot(slotId, allocationId);
-
-					return null;
-				}
-			}, mainThreadExecutor)
-			.thenApply(new ApplyFunction<Void, Boolean>() {
-				@Override
-				public Boolean apply(Void value) {
-					return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
-				}
-			});
+			CompletableFuture<Boolean> idleFuture2 = CompletableFuture.runAsync(
+				() -> slotManager.freeSlot(slotId, allocationId),
+				mainThreadExecutor)
+			.thenApply((Object value) -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()));
 
 			assertTrue(idleFuture2.get());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a1ab1ab..844e159 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
@@ -41,6 +40,7 @@ import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -104,7 +104,7 @@ public class SlotProtocolTest extends TestLogger {
 			Mockito.when(
 				taskExecutorGateway
 					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
-				.thenReturn(mock(FlinkFuture.class));
+				.thenReturn(mock(CompletableFuture.class));
 
 			final ResourceID resourceID = ResourceID.generate();
 			final SlotID slotID = new SlotID(resourceID, 0);
@@ -139,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
 		Mockito.when(
 			taskExecutorGateway
 				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
-			.thenReturn(mock(FlinkFuture.class));
+			.thenReturn(mock(CompletableFuture.class));
 
 		try (SlotManager slotManager = new SlotManager(
 			scheduledExecutor,

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e636d6c..4be5257 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -23,14 +23,13 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
@@ -90,9 +89,8 @@ public class AsyncCallsTest extends TestLogger {
 			});
 		}
 	
-		Future<String> result = testEndpoint.callAsync(new Callable<String>() {
-			@Override
-			public String call() throws Exception {
+		CompletableFuture<String> result = testEndpoint.callAsync(
+			() -> {
 				boolean holdsLock = lock.tryLock();
 				if (holdsLock) {
 					lock.unlock();
@@ -100,8 +98,8 @@ public class AsyncCallsTest extends TestLogger {
 					concurrentAccess.set(true);
 				}
 				return "test";
-			}
-		}, Time.seconds(30L));
+			},
+			Time.seconds(30L));
 
 		String str = result.get(30, TimeUnit.SECONDS);
 		assertEquals("test", str);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index bbde331..07dadae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -42,6 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -74,7 +74,7 @@ public class RpcCompletenessTest extends TestLogger {
 
 	private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
 
-	private static final Class<?> futureClass = Future.class;
+	private static final Class<?> futureClass = CompletableFuture.class;
 	private static final Class<?> timeoutClass = Time.class;
 
 	@Test
@@ -195,7 +195,7 @@ public class RpcCompletenessTest extends TestLogger {
 	/**
 	 * Checks whether the gateway method fulfills the gateway method requirements.
 	 * <ul>
-	 *     <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
+	 *     <li>It checks whether the return type is void or a {@link CompletableFuture} wrapping the actual result. </li>
 	 *     <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
 	 * </ul>
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index e05c8d8..4220fff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -33,6 +32,7 @@ import org.junit.Test;
 import scala.Option;
 import scala.Tuple2;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -57,7 +57,7 @@ public class RpcConnectionTest {
 			// can only pass if the connection problem is not recognized merely via a timeout
 			rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
 
-			Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
+			CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
 
 			future.get(10000000, TimeUnit.SECONDS);
 			fail("should never complete normally");

http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 03fe84b..ccf0acd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -77,8 +74,8 @@ public abstract class TestingGatewayBase implements RpcGateway {
 	//  utilities
 	// ------------------------------------------------------------------------
 
-	public <T> Future<T> futureWithTimeout(long timeoutMillis) {
-		FlinkCompletableFuture<T> future = new FlinkCompletableFuture<>();
+	public <T> CompletableFuture<T> futureWithTimeout(long timeoutMillis) {
+		CompletableFuture<T> future = new CompletableFuture<>();
 		executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS);
 		return future;
 	}