You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/03 11:49:03 UTC
[1/3] flink git commit: [FLINK-7334] [futures] Replace Flink's
futures with Java 8's CompletableFuture in RpcEndpoint,
RpcGateways and RpcService
Repository: flink
Updated Branches:
refs/heads/master 74e479e51 -> eddafc1ac
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index b56bf6b..14cf35a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -92,20 +92,19 @@ public class TestingRpcService extends AkkaRpcService {
}
@Override
- public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+ public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
RpcGateway gateway = registeredConnections.get(address);
if (gateway != null) {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return FlinkCompletableFuture.completed(typedGateway);
+ return CompletableFuture.completedFuture(typedGateway);
} else {
- return FlinkCompletableFuture.completedExceptionally(
- new Exception("Gateway registered under " + address + " is not of type " + clazz));
+ return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
} else {
- return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
+ return FutureUtils.completedExceptionally(new Exception("No gateway registered under " + address + '.'));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index ac3f40b..37349a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.Preconditions;
@@ -36,10 +34,12 @@ import java.util.BitSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -65,7 +65,7 @@ public class TestingSerialRpcService implements RpcService {
executorService = new DirectExecutorService();
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
this.registeredConnections = new ConcurrentHashMap<>(16);
- this.terminationFuture = new FlinkCompletableFuture<>();
+ this.terminationFuture = new CompletableFuture<>();
this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService);
}
@@ -88,13 +88,13 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
- public <T> Future<T> execute(Callable<T> callable) {
+ public <T> CompletableFuture<T> execute(Callable<T> callable) {
try {
T result = callable.call();
- return FlinkCompletableFuture.completed(result);
+ return CompletableFuture.completedFuture(result);
} catch (Exception e) {
- return FlinkCompletableFuture.completedExceptionally(e);
+ return FutureUtils.completedExceptionally(e);
}
}
@@ -134,7 +134,7 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
- public Future<Void> getTerminationFuture() {
+ public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}
@@ -178,20 +178,19 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
- public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
+ public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
RpcGateway gateway = registeredConnections.get(address);
if (gateway != null) {
if (clazz.isAssignableFrom(gateway.getClass())) {
@SuppressWarnings("unchecked")
C typedGateway = (C) gateway;
- return FlinkCompletableFuture.completed(typedGateway);
+ return CompletableFuture.completedFuture(typedGateway);
} else {
- return FlinkCompletableFuture.completedExceptionally(
- new Exception("Gateway registered under " + address + " is not of type " + clazz));
+ return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
}
} else {
- return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name"));
+ return FutureUtils.completedExceptionally(new Exception("No gateway registered under " + address + '.'));
}
}
@@ -251,12 +250,12 @@ public class TestingSerialRpcService implements RpcService {
Class<?> returnType = method.getReturnType();
- if (returnType.equals(Future.class)) {
+ if (returnType.equals(CompletableFuture.class)) {
try {
Object result = handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
- return FlinkCompletableFuture.completed(result);
+ return CompletableFuture.completedFuture(result);
} catch (Throwable e) {
- return FlinkCompletableFuture.completedExceptionally(e);
+ return FutureUtils.completedExceptionally(e);
}
} else {
return handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, futureTimeout);
@@ -290,11 +289,11 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
- public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
+ public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
try {
- return FlinkCompletableFuture.completed(callable.call());
+ return CompletableFuture.completedFuture(callable.call());
} catch (Throwable e) {
- return FlinkCompletableFuture.completedExceptionally(e);
+ return FutureUtils.completedExceptionally(e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 0b06267..793d292 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -21,9 +21,6 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
@@ -37,7 +34,7 @@ import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Test;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
@@ -74,7 +71,7 @@ public class AkkaRpcActorTest extends TestLogger {
public void testAddressResolution() throws Exception {
DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
- Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
+ CompletableFuture<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
DummyRpcGateway rpcGateway = futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
@@ -86,7 +83,7 @@ public class AkkaRpcActorTest extends TestLogger {
*/
@Test
public void testFailingAddressResolution() throws Exception {
- Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
+ CompletableFuture<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
try {
futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
@@ -111,7 +108,7 @@ public class AkkaRpcActorTest extends TestLogger {
DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
// this message should be discarded and completed with an AkkaRpcException
- Future<Integer> result = rpcGateway.foobar();
+ CompletableFuture<Integer> result = rpcGateway.foobar();
try {
result.get(timeout.getSize(), timeout.getUnit());
@@ -150,14 +147,14 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.start();
- Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
+ CompletableFuture<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
WrongRpcGateway gateway = futureGateway.get(timeout.getSize(), timeout.getUnit());
// since it is a tell operation we won't receive a RpcConnectionException, it's only logged
gateway.tell("foobar");
- Future<Boolean> result = gateway.barfoo();
+ CompletableFuture<Boolean> result = gateway.barfoo();
try {
result.get(timeout.getSize(), timeout.getUnit());
@@ -178,18 +175,13 @@ public class AkkaRpcActorTest extends TestLogger {
final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
rpcEndpoint.start();
- Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+ CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
assertFalse(terminationFuture.isDone());
- FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- rpcEndpoint.shutDown();
-
- return null;
- }
- }, actorSystem.dispatcher());
+ CompletableFuture.runAsync(
+ () -> rpcEndpoint.shutDown(),
+ actorSystem.dispatcher());
// wait until the rpc endpoint has terminated
terminationFuture.get();
@@ -201,7 +193,7 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.start();
ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
- Future<Integer> result = rpcGateway.doStuff();
+ CompletableFuture<Integer> result = rpcGateway.doStuff();
try {
result.get(timeout.getSize(), timeout.getUnit());
@@ -220,7 +212,7 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.start();
ExceptionalGateway rpcGateway = rpcEndpoint.getSelf();
- Future<Integer> result = rpcGateway.doStuff();
+ CompletableFuture<Integer> result = rpcGateway.doStuff();
try {
result.get(timeout.getSize(), timeout.getUnit());
@@ -244,7 +236,7 @@ public class AkkaRpcActorTest extends TestLogger {
rpcEndpoint.shutDown();
- Future<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
+ CompletableFuture<Void> terminationFuture = rpcEndpoint.getTerminationFuture();
try {
terminationFuture.get();
@@ -263,7 +255,7 @@ public class AkkaRpcActorTest extends TestLogger {
simpleRpcEndpoint.shutDown();
- Future<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
+ CompletableFuture<Void> terminationFuture = simpleRpcEndpoint.getTerminationFuture();
// check that we executed the postStop method in the main thread, otherwise an exception
// would be thrown here.
@@ -275,11 +267,11 @@ public class AkkaRpcActorTest extends TestLogger {
// ------------------------------------------------------------------------
private interface DummyRpcGateway extends RpcGateway {
- Future<Integer> foobar();
+ CompletableFuture<Integer> foobar();
}
private interface WrongRpcGateway extends RpcGateway {
- Future<Boolean> barfoo();
+ CompletableFuture<Boolean> barfoo();
void tell(String message);
}
@@ -304,7 +296,7 @@ public class AkkaRpcActorTest extends TestLogger {
// ------------------------------------------------------------------------
private interface ExceptionalGateway extends RpcGateway {
- Future<Integer> doStuff();
+ CompletableFuture<Integer> doStuff();
}
private static class ExceptionalEndpoint extends RpcEndpoint<ExceptionalGateway> {
@@ -326,8 +318,8 @@ public class AkkaRpcActorTest extends TestLogger {
}
@RpcMethod
- public Future<Integer> doStuff() {
- final FlinkCompletableFuture<Integer> future = new FlinkCompletableFuture<>();
+ public CompletableFuture<Integer> doStuff() {
+ final CompletableFuture<Integer> future = new CompletableFuture<>();
// complete the future slightly in the, well, future...
new Thread() {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 42f63ef..e0d1110 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -22,15 +22,14 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
@@ -91,26 +90,21 @@ public class AkkaRpcServiceTest extends TestLogger {
public void testExecuteRunnable() throws Exception {
final OneShotLatch latch = new OneShotLatch();
- akkaRpcService.execute(new Runnable() {
- @Override
- public void run() {
- latch.trigger();
- }
- });
+ akkaRpcService.execute(() -> latch.trigger());
latch.await(30L, TimeUnit.SECONDS);
}
/**
* Tests that the {@link AkkaRpcService} can execute callables and returns their result as
- * a {@link Future}.
+ * a {@link CompletableFuture}.
*/
@Test
public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
final OneShotLatch latch = new OneShotLatch();
final int expected = 42;
- Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
+ CompletableFuture<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
latch.trigger();
@@ -145,18 +139,11 @@ public class AkkaRpcServiceTest extends TestLogger {
final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds(1000));
- Future<Void> terminationFuture = rpcService.getTerminationFuture();
+ CompletableFuture<Void> terminationFuture = rpcService.getTerminationFuture();
assertFalse(terminationFuture.isDone());
- FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- rpcService.stopService();
-
- return null;
- }
- }, actorSystem.dispatcher());
+ CompletableFuture.runAsync(() -> rpcService.stopService(), actorSystem.dispatcher());
terminationFuture.get();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index d640a97..34cf412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -23,7 +23,6 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
@@ -35,6 +34,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import static org.junit.Assert.assertThat;
@@ -108,7 +108,7 @@ public class MessageSerializationTest extends TestLogger {
String address = testEndpoint.getAddress();
- Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+ CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
@@ -129,7 +129,7 @@ public class MessageSerializationTest extends TestLogger {
String address = testEndpoint.getAddress();
- Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+ CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
@@ -153,7 +153,7 @@ public class MessageSerializationTest extends TestLogger {
String address = testEndpoint.getAddress();
- Future<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
+ CompletableFuture<TestGateway> remoteGatewayFuture = akkaRpcService2.connect(address, TestGateway.class);
TestGateway remoteGateway = remoteGatewayFuture.get(timeout.getSize(), timeout.getUnit());
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 6a0bd87..53c435e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -26,8 +26,6 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -62,6 +60,7 @@ import org.junit.Test;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -159,13 +158,13 @@ public class TaskExecutorITCase extends TestLogger {
JobMasterGateway jmGateway = mock(JobMasterGateway.class);
when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
+ .thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
when(jmGateway.getHostname()).thenReturn(jmAddress);
when(jmGateway.offerSlots(
eq(taskManagerResourceId),
any(Iterable.class),
eq(jmLeaderId),
- any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS));
+ any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
rpcService.registerGateway(rmAddress, resourceManager.getSelf());
@@ -185,7 +184,7 @@ public class TaskExecutorITCase extends TestLogger {
// notify the TM about the new RM leader
rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
- Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(
rmLeaderId,
jmLeaderId,
jmResourceId,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b596f75..a4f0e03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -28,10 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -65,7 +62,6 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
@@ -95,6 +91,7 @@ import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
@@ -168,7 +165,7 @@ public class TaskExecutorTest extends TestLogger {
eq(taskManagerLocation),
eq(jmLeaderId),
any(Time.class)
- )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+ )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
when(jobMasterGateway.getHostname()).thenReturn("localhost");
@@ -230,7 +227,7 @@ public class TaskExecutorTest extends TestLogger {
when(rmGateway.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
.thenReturn(
- FlinkCompletableFuture.<RegistrationResponse>completed(
+ CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
rmResourceId,
@@ -334,7 +331,7 @@ public class TaskExecutorTest extends TestLogger {
when(rmGateway.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
.thenReturn(
- FlinkCompletableFuture.<RegistrationResponse>completed(
+ CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(
new InstanceID(),
rmResourceId,
@@ -467,7 +464,7 @@ public class TaskExecutorTest extends TestLogger {
ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
when(rmGateway.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(
+ .thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
new InstanceID(), resourceManagerResourceId, 10L)));
TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
@@ -539,11 +536,11 @@ public class TaskExecutorTest extends TestLogger {
when(rmGateway1.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+ .thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
when(rmGateway2.registerTaskExecutor(
any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(
+ .thenReturn(CompletableFuture.completedFuture(
new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
rpc.registerGateway(address1, rmGateway1);
@@ -728,7 +725,7 @@ public class TaskExecutorTest extends TestLogger {
taskManager.submitTask(tdd, jobManagerLeaderId);
- Future<Boolean> completionFuture = TestInvokable.completableFuture;
+ CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture;
completionFuture.get();
@@ -744,7 +741,7 @@ public class TaskExecutorTest extends TestLogger {
*/
public static class TestInvokable extends AbstractInvokable {
- static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture<>();
+ static final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
@Override
public void invoke() throws Exception {
@@ -793,7 +790,7 @@ public class TaskExecutorTest extends TestLogger {
any(String.class),
eq(resourceId),
any(SlotReport.class),
- any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final String jobManagerAddress = "jm";
final UUID jobManagerLeaderId = UUID.randomUUID();
@@ -807,13 +804,13 @@ public class TaskExecutorTest extends TestLogger {
eq(taskManagerLocation),
eq(jobManagerLeaderId),
any(Time.class)
- )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+ )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.offerSlots(
any(ResourceID.class),
any(Iterable.class),
any(UUID.class),
- any(Time.class))).thenReturn(mock(Future.class, RETURNS_MOCKS));
+ any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -906,7 +903,7 @@ public class TaskExecutorTest extends TestLogger {
any(String.class),
eq(resourceId),
any(SlotReport.class),
- any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
final int blobPort = 42;
@@ -923,12 +920,12 @@ public class TaskExecutorTest extends TestLogger {
eq(taskManagerLocation),
eq(jobManagerLeaderId),
any(Time.class)
- )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+ )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
when(jobMasterGateway.offerSlots(
any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed((Iterable<SlotOffer>)Collections.singleton(offer1)));
+ .thenReturn(CompletableFuture.completedFuture((Iterable<SlotOffer>)Collections.singleton(offer1)));
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
rpc.registerGateway(jobManagerAddress, jobMasterGateway);
@@ -1120,7 +1117,7 @@ public class TaskExecutorTest extends TestLogger {
eq(resourceId),
any(SlotReport.class),
any(Time.class))).thenReturn(
- FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
+ CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(registrationId, resourceManagerResourceId, 1000L)));
final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
final int blobPort = 42;
@@ -1137,7 +1134,7 @@ public class TaskExecutorTest extends TestLogger {
eq(taskManagerLocation),
eq(jobManagerLeaderId),
any(Time.class)
- )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+ )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
@@ -1224,7 +1221,7 @@ public class TaskExecutorTest extends TestLogger {
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
Collections.<InputGateDeploymentDescriptor>emptyList());
- CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new FlinkCompletableFuture<>();
+ CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new CompletableFuture<>();
// submit task first and then return acceptance response
when(
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e790ea8..fdae251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -35,8 +35,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -110,6 +108,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
@@ -1612,7 +1611,7 @@ public class TaskManagerTest extends TestLogger {
Await.result(result, timeout);
- org.apache.flink.runtime.concurrent.Future<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
+ CompletableFuture<Boolean> cancelFuture = TestInvokableRecordCancel.gotCanceled();
assertEquals(true, cancelFuture.get());
} finally {
@@ -2070,7 +2069,7 @@ public class TaskManagerTest extends TestLogger {
public static final class TestInvokableRecordCancel extends AbstractInvokable {
private static final Object lock = new Object();
- private static CompletableFuture<Boolean> gotCanceledFuture = new FlinkCompletableFuture<>();
+ private static CompletableFuture<Boolean> gotCanceledFuture = new CompletableFuture<>();
@Override
public void invoke() throws Exception {
@@ -2099,11 +2098,11 @@ public class TaskManagerTest extends TestLogger {
public static void resetGotCanceledFuture() {
synchronized (lock) {
- gotCanceledFuture = new FlinkCompletableFuture<>();
+ gotCanceledFuture = new CompletableFuture<>();
}
}
- public static org.apache.flink.runtime.concurrent.Future<Boolean> gotCanceled() {
+ public static CompletableFuture<Boolean> gotCanceled() {
synchronized (lock) {
return gotCanceledFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index 5832b89..f3b68c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,7 +33,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -92,17 +91,20 @@ public class OrderedStreamElementQueueTest extends TestLogger {
queue.put(entry);
}
- Future<List<AsyncResult>> pollOperation = FlinkFuture.supplyAsync(new Callable<List<AsyncResult>>() {
- @Override
- public List<AsyncResult> call() throws Exception {
+ CompletableFuture<List<AsyncResult>> pollOperation = CompletableFuture.supplyAsync(
+ () -> {
List<AsyncResult> result = new ArrayList<>(4);
while (!queue.isEmpty()) {
- result.add(queue.poll());
+ try {
+ result.add(queue.poll());
+ } catch (InterruptedException e) {
+ throw new FlinkFutureException(e);
+ }
}
return result;
- }
- }, executor);
+ },
+ executor);
Thread.sleep(10L);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index fe9db95..d396756 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -37,7 +36,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -176,14 +175,15 @@ public class StreamElementQueueTest extends TestLogger {
Assert.assertEquals(1, queue.size());
- Future<Void> putOperation = FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- queue.put(streamRecordQueueEntry2);
-
- return null;
- }
- }, executor);
+ CompletableFuture<Void> putOperation = CompletableFuture.runAsync(
+ () -> {
+ try {
+ queue.put(streamRecordQueueEntry2);
+ } catch (InterruptedException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ executor);
// give the future a chance to complete
Thread.sleep(10L);
@@ -215,12 +215,15 @@ public class StreamElementQueueTest extends TestLogger {
Assert.assertTrue(queue.isEmpty());
- Future<AsyncResult> peekOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
- @Override
- public AsyncResult call() throws Exception {
- return queue.peekBlockingly();
- }
- }, executor);
+ CompletableFuture<AsyncResult> peekOperation = CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return queue.peekBlockingly();
+ } catch (InterruptedException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ executor);
Thread.sleep(10L);
@@ -236,12 +239,15 @@ public class StreamElementQueueTest extends TestLogger {
Assert.assertEquals(watermarkQueueEntry, queue.poll());
Assert.assertTrue(queue.isEmpty());
- Future<AsyncResult> pollOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
- @Override
- public AsyncResult call() throws Exception {
- return queue.poll();
- }
- }, executor);
+ CompletableFuture<AsyncResult> pollOperation = CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return queue.poll();
+ } catch (InterruptedException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ executor);
Thread.sleep(10L);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index ba6ce42..cc0bc30 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -18,8 +18,7 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,7 +34,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -100,12 +99,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
Assert.assertTrue(8 == queue.size());
- Future<AsyncResult> firstPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
- @Override
- public AsyncResult call() throws Exception {
- return queue.poll();
- }
- }, executor);
+ CompletableFuture<AsyncResult> firstPoll = CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return queue.poll();
+ } catch (InterruptedException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ executor);
// this should not fulfill the poll, because R3 is behind W1
record3.collect(Collections.<Integer>emptyList());
@@ -118,12 +120,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
Assert.assertEquals(record2, firstPoll.get());
- Future<AsyncResult> secondPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
- @Override
- public AsyncResult call() throws Exception {
- return queue.poll();
- }
- }, executor);
+ CompletableFuture<AsyncResult> secondPoll = CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return queue.poll();
+ } catch (InterruptedException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ executor);
record6.collect(Collections.<Integer>emptyList());
record4.collect(Collections.<Integer>emptyList());
@@ -161,12 +166,15 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
// only R5 left in the queue
Assert.assertTrue(1 == queue.size());
- Future<AsyncResult> thirdPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
- @Override
- public AsyncResult call() throws Exception {
- return queue.poll();
- }
- }, executor);
+ CompletableFuture<AsyncResult> thirdPoll = CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return queue.poll();
+ } catch (InterruptedException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ executor);
Thread.sleep(10L);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index f021b38..4f2135d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
@@ -75,6 +73,7 @@ import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;
import static org.junit.Assert.assertEquals;
@@ -160,14 +159,9 @@ public class StreamTaskTerminationTest extends TestLogger {
mock(PartitionProducerStateChecker.class),
Executors.directExecutor());
- Future<Void> taskRun = FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- task.run();
-
- return null;
- }
- }, TestingUtils.defaultExecutor());
+ CompletableFuture<Void> taskRun = CompletableFuture.runAsync(
+ () -> task.run(),
+ TestingUtils.defaultExecutor());
// wait until the stream task started running
RUN_LATCH.await();
[3/3] flink git commit: [FLINK-7334] [futures] Replace Flink's
futures with Java 8's CompletableFuture in RpcEndpoint,
RpcGateways and RpcService
Posted by tr...@apache.org.
[FLINK-7334] [futures] Replace Flink's futures with Java 8's CompletableFuture in RpcEndpoint, RpcGateways and RpcService
Remove Futures from RpcGateways
Remove Future usage
Fix failing AkkaRpcActorTest
This closes #4462.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eddafc1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eddafc1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eddafc1a
Branch: refs/heads/master
Commit: eddafc1ac9e4d787df44b63809f0d6dfd1f3def7
Parents: 74e479e
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 1 11:33:48 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Aug 3 13:45:38 2017 +0200
----------------------------------------------------------------------
.../MesosResourceManagerTest.java | 6 +-
.../handlers/TaskManagerLogHandler.java | 4 +-
.../handlers/TaskManagerLogHandlerTest.java | 5 +-
.../runtime/dispatcher/DispatcherGateway.java | 6 +-
.../flink/runtime/executiongraph/Execution.java | 25 ++--
.../apache/flink/runtime/instance/SlotPool.java | 17 +--
.../flink/runtime/instance/SlotPoolGateway.java | 8 +-
.../slots/ActorTaskManagerGateway.java | 38 ++---
.../jobmanager/slots/TaskManagerGateway.java | 19 +--
.../flink/runtime/jobmaster/JobMaster.java | 93 ++++++-------
.../runtime/jobmaster/JobMasterGateway.java | 18 +--
.../jobmaster/RpcTaskManagerGateway.java | 25 ++--
.../registration/RetryingRegistration.java | 4 +-
.../resourcemanager/JobLeaderIdService.java | 16 +--
.../resourcemanager/ResourceManager.java | 29 ++--
.../resourcemanager/ResourceManagerGateway.java | 10 +-
.../slotmanager/SlotManager.java | 16 +--
.../flink/runtime/rpc/MainThreadExecutable.java | 4 +-
.../apache/flink/runtime/rpc/RpcEndpoint.java | 6 +-
.../apache/flink/runtime/rpc/RpcService.java | 10 +-
.../apache/flink/runtime/rpc/SelfGateway.java | 4 +-
.../runtime/rpc/akka/AkkaInvocationHandler.java | 30 ++--
.../flink/runtime/rpc/akka/AkkaRpcActor.java | 36 ++---
.../flink/runtime/rpc/akka/AkkaRpcService.java | 28 ++--
.../runtime/taskexecutor/JobLeaderService.java | 5 +-
.../runtime/taskexecutor/TaskExecutor.java | 20 ++-
.../taskexecutor/TaskExecutorGateway.java | 16 +--
...TaskExecutorToResourceManagerConnection.java | 3 +-
.../runtime/taskexecutor/TaskManagerRunner.java | 4 +-
.../taskexecutor/rpc/RpcInputSplitProvider.java | 4 +-
.../rpc/RpcPartitionStateChecker.java | 3 +-
.../RpcResultPartitionConsumableNotifier.java | 22 ++-
.../flink/runtime/jobmanager/JobManager.scala | 17 +--
.../runtime/akka/QuarantineMonitorTest.java | 16 +--
.../CheckpointCoordinatorMasterHooksTest.java | 5 +-
.../clusterframework/ResourceManagerTest.java | 6 +-
.../runtime/dispatcher/DispatcherTest.java | 5 +-
.../ExecutionGraphMetricsTest.java | 3 +-
.../ExecutionGraphSchedulingTest.java | 3 +-
.../executiongraph/ExecutionGraphStopTest.java | 7 +-
.../utils/NotCancelAckingTaskGateway.java | 9 +-
.../utils/SimpleAckingTaskManagerGateway.java | 36 ++---
.../flink/runtime/instance/SlotPoolRpcTest.java | 4 +-
.../flink/runtime/instance/SlotPoolTest.java | 20 +--
.../flink/runtime/jobmaster/JobMasterTest.java | 5 +-
.../registration/RetryingRegistrationTest.java | 13 +-
.../registration/TestRegistrationGateway.java | 7 +-
.../resourcemanager/JobLeaderIdServiceTest.java | 8 +-
.../ResourceManagerJobMasterTest.java | 22 +--
.../ResourceManagerTaskExecutorTest.java | 10 +-
.../slotmanager/SlotManagerTest.java | 138 +++++++------------
.../slotmanager/SlotProtocolTest.java | 6 +-
.../flink/runtime/rpc/AsyncCallsTest.java | 12 +-
.../flink/runtime/rpc/RpcCompletenessTest.java | 6 +-
.../flink/runtime/rpc/RpcConnectionTest.java | 4 +-
.../flink/runtime/rpc/TestingGatewayBase.java | 9 +-
.../flink/runtime/rpc/TestingRpcService.java | 13 +-
.../runtime/rpc/TestingSerialRpcService.java | 37 +++--
.../runtime/rpc/akka/AkkaRpcActorTest.java | 46 +++----
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 25 +---
.../rpc/akka/MessageSerializationTest.java | 8 +-
.../taskexecutor/TaskExecutorITCase.java | 9 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 39 +++---
.../runtime/taskmanager/TaskManagerTest.java | 11 +-
.../queue/OrderedStreamElementQueueTest.java | 20 +--
.../async/queue/StreamElementQueueTest.java | 52 +++----
.../queue/UnorderedStreamElementQueueTest.java | 50 ++++---
.../tasks/StreamTaskTerminationTest.java | 14 +-
68 files changed, 559 insertions(+), 670 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 6e6a59c..e63b4ab 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -89,6 +88,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -414,7 +414,7 @@ public class MesosResourceManagerTest extends TestLogger {
* Register a job master with the RM.
*/
public void registerJobMaster(MockJobMaster jobMaster) throws Exception {
- Future<RegistrationResponse> registration = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> registration = resourceManager.registerJobManager(
rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID);
assertTrue(registration.get() instanceof JobMasterRegistrationSuccess);
}
@@ -588,7 +588,7 @@ public class MesosResourceManagerTest extends TestLogger {
assertThat(resourceManager.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
// send registration message
- Future<RegistrationResponse> successfulFuture =
+ CompletableFuture<RegistrationResponse> successfulFuture =
resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport);
RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 9cbb71d..ce29721 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -196,10 +196,10 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
switch (fileMode) {
case LOG:
- return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout));
+ return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
case STDOUT:
default:
- return FutureUtils.toJava(taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout));
+ return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
}
}
);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index bfcaf88..e1c3686 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -25,9 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
@@ -50,6 +48,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import scala.Option;
@@ -109,7 +108,7 @@ public class TaskManagerLogHandlerTest {
when(taskManager.getId()).thenReturn(tmID);
when(taskManager.getTaskManagerID()).thenReturn(tmRID);
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
- CompletableFuture<BlobKey> future = new FlinkCompletableFuture<>();
+ CompletableFuture<BlobKey> future = new CompletableFuture<>();
future.completeExceptionally(new IOException("failure"));
when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index c730bc1..33b8a42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -20,13 +20,13 @@ package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
/**
* Gateway for the Dispatcher component.
@@ -40,7 +40,7 @@ public interface DispatcherGateway extends RpcGateway {
* @param timeout RPC timeout
* @return A future acknowledge if the submission succeeded
*/
- Future<Acknowledge> submitJob(
+ CompletableFuture<Acknowledge> submitJob(
JobGraph jobGraph,
@RpcTimeout Time timeout);
@@ -50,6 +50,6 @@ public interface DispatcherGateway extends RpcGateway {
* @param timeout RPC timeout
* @return A future collection of currently submitted jobs
*/
- Future<Collection<JobID>> listJobs(
+ CompletableFuture<Collection<JobID>> listJobs(
@RpcTimeout Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 5cb12ea..bd5bc7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -418,8 +418,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- final CompletableFuture<Acknowledge> submitResultFuture = FutureUtils.toJava(
- taskManagerGateway.submitTask(deployment, timeout));
+ final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
submitResultFuture.whenCompleteAsync(
(ack, failure) -> {
@@ -454,7 +453,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
CompletableFuture<Acknowledge> stopResultFuture = FutureUtils.retry(
- () -> FutureUtils.toJava(taskManagerGateway.stopTask(attemptId, timeout)),
+ () -> taskManagerGateway.stopTask(attemptId, timeout),
NUM_STOP_CALL_TRIES,
executor);
@@ -679,14 +678,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- return FutureUtils.toJava(
- taskManagerGateway.requestStackTraceSample(
- attemptId,
- sampleId,
- numSamples,
- delayBetweenSamples,
- maxStrackTraceDepth,
- timeout));
+ return taskManagerGateway.requestStackTraceSample(
+ attemptId,
+ sampleId,
+ numSamples,
+ delayBetweenSamples,
+ maxStrackTraceDepth,
+ timeout);
} else {
return FutureUtils.completedExceptionally(new Exception("The execution has no slot assigned."));
}
@@ -1011,7 +1009,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
CompletableFuture<Acknowledge> cancelResultFuture = FutureUtils.retry(
- () -> FutureUtils.toJava(taskManagerGateway.cancelTask(attemptId, timeout)),
+ () -> taskManagerGateway.cancelTask(attemptId, timeout),
NUM_CANCEL_CALL_TRIES,
executor);
@@ -1050,8 +1048,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
- CompletableFuture<Acknowledge> updatePartitionsResultFuture = FutureUtils.toJava(
- taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout));
+ CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout);
updatePartitionsResultFuture.whenCompleteAsync(
(ack, failure) -> {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 9a26779..508e54f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,8 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -263,12 +261,12 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
// ------------------------------------------------------------------------
@RpcMethod
- public Future<SimpleSlot> allocateSlot(
+ public CompletableFuture<SimpleSlot> allocateSlot(
ScheduledUnit task,
ResourceProfile resources,
Iterable<TaskManagerLocation> locationPreferences) {
- return FutureUtils.toFlinkFuture(internalAllocateSlot(task, resources, locationPreferences));
+ return internalAllocateSlot(task, resources, locationPreferences);
}
@RpcMethod
@@ -316,11 +314,10 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
- CompletableFuture<Acknowledge> rmResponse = FutureUtils.toJava(
- resourceManagerGateway.requestSlot(
- jobManagerLeaderId, resourceManagerLeaderId,
- new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
- resourceManagerRequestsTimeout));
+ CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
+ jobManagerLeaderId, resourceManagerLeaderId,
+ new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
+ resourceManagerRequestsTimeout);
CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
(Acknowledge value) -> {
@@ -984,7 +981,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
Iterable<TaskManagerLocation> locationPreferences =
task.getTaskToExecute().getVertex().getPreferredLocations();
- return FutureUtils.toJava(gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
+ return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 42942ca..43f407a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* The gateway for calls on the {@link SlotPool}.
@@ -75,9 +75,9 @@ public interface SlotPoolGateway extends RpcGateway {
void releaseTaskManager(ResourceID resourceID);
- Future<Boolean> offerSlot(AllocatedSlot slot);
+ CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);
- Future<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
+ CompletableFuture<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
void failAllocation(AllocationID allocationID, Exception cause);
@@ -85,7 +85,7 @@ public interface SlotPoolGateway extends RpcGateway {
// allocating and disposing slots
// ------------------------------------------------------------------------
- Future<SimpleSlot> allocateSlot(
+ CompletableFuture<SimpleSlot> allocateSlot(
ScheduledUnit task,
ResourceProfile resources,
Iterable<TaskManagerLocation> locationPreferences,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index 2876ebe..a773ce9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -24,8 +24,7 @@ import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -41,6 +40,9 @@ import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.CompletableFuture;
+
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
@@ -78,7 +80,7 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
}
@Override
- public Future<StackTrace> requestStackTrace(final Time timeout) {
+ public CompletableFuture<StackTrace> requestStackTrace(final Time timeout) {
Preconditions.checkNotNull(timeout);
scala.concurrent.Future<StackTrace> stackTraceFuture = actorGateway.ask(
@@ -86,11 +88,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
new FiniteDuration(timeout.getSize(), timeout.getUnit()))
.mapTo(ClassTag$.MODULE$.<StackTrace>apply(StackTrace.class));
- return new FlinkFuture<>(stackTraceFuture);
+ return FutureUtils.toJava(stackTraceFuture);
}
@Override
- public Future<StackTraceSampleResponse> requestStackTraceSample(
+ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
ExecutionAttemptID executionAttemptID,
int sampleId,
int numSamples,
@@ -113,11 +115,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
new FiniteDuration(timeout.getSize(), timeout.getUnit()))
.mapTo(ClassTag$.MODULE$.<StackTraceSampleResponse>apply(StackTraceSampleResponse.class));
- return new FlinkFuture<>(stackTraceSampleResponseFuture);
+ return FutureUtils.toJava(stackTraceSampleResponseFuture);
}
@Override
- public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+ public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
Preconditions.checkNotNull(tdd);
Preconditions.checkNotNull(timeout);
@@ -126,11 +128,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
new FiniteDuration(timeout.getSize(), timeout.getUnit()))
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
- return new FlinkFuture<>(submitResult);
+ return FutureUtils.toJava(submitResult);
}
@Override
- public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
Preconditions.checkNotNull(executionAttemptID);
Preconditions.checkNotNull(timeout);
@@ -139,11 +141,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
new FiniteDuration(timeout.getSize(), timeout.getUnit()))
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
- return new FlinkFuture<>(stopResult);
+ return FutureUtils.toJava(stopResult);
}
@Override
- public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
Preconditions.checkNotNull(executionAttemptID);
Preconditions.checkNotNull(timeout);
@@ -152,11 +154,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
new FiniteDuration(timeout.getSize(), timeout.getUnit()))
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
- return new FlinkFuture<>(cancelResult);
+ return FutureUtils.toJava(cancelResult);
}
@Override
- public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
Preconditions.checkNotNull(executionAttemptID);
Preconditions.checkNotNull(partitionInfos);
@@ -169,7 +171,7 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
new FiniteDuration(timeout.getSize(), timeout.getUnit()))
.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
- return new FlinkFuture<>(updatePartitionsResult);
+ return FutureUtils.toJava(updatePartitionsResult);
}
@Override
@@ -207,16 +209,16 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
}
@Override
- public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+ public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerLog(), timeout);
}
@Override
- public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+ public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
}
- private Future<BlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
+ private CompletableFuture<BlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
Preconditions.checkNotNull(request);
Preconditions.checkNotNull(timeout);
@@ -226,6 +228,6 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
new FiniteDuration(timeout.getSize(), timeout.getUnit()))
.mapTo(ClassTag$.MODULE$.<BlobKey>apply(BlobKey.class));
- return new FlinkFuture<>(blobKeyFuture);
+ return FutureUtils.toJava(blobKeyFuture);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 09f104f..36cea86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -32,6 +31,8 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTrace;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import java.util.concurrent.CompletableFuture;
+
/**
* Task manager gateway interface to communicate with the task manager.
*/
@@ -66,7 +67,7 @@ public interface TaskManagerGateway {
* @param timeout for the stack trace request
* @return Future for a stack trace
*/
- Future<StackTrace> requestStackTrace(final Time timeout);
+ CompletableFuture<StackTrace> requestStackTrace(final Time timeout);
/**
* Request a stack trace sample from the given task.
@@ -79,7 +80,7 @@ public interface TaskManagerGateway {
* @param timeout of the request
* @return Future of stack trace sample response
*/
- Future<StackTraceSampleResponse> requestStackTraceSample(
+ CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
final ExecutionAttemptID executionAttemptID,
final int sampleId,
final int numSamples,
@@ -94,7 +95,7 @@ public interface TaskManagerGateway {
* @param timeout of the submit operation
* @return Future acknowledge of the successful operation
*/
- Future<Acknowledge> submitTask(
+ CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
Time timeout);
@@ -105,7 +106,7 @@ public interface TaskManagerGateway {
* @param timeout of the submit operation
* @return Future acknowledge if the task is successfully stopped
*/
- Future<Acknowledge> stopTask(
+ CompletableFuture<Acknowledge> stopTask(
ExecutionAttemptID executionAttemptID,
Time timeout);
@@ -116,7 +117,7 @@ public interface TaskManagerGateway {
* @param timeout of the submit operation
* @return Future acknowledge if the task is successfully canceled
*/
- Future<Acknowledge> cancelTask(
+ CompletableFuture<Acknowledge> cancelTask(
ExecutionAttemptID executionAttemptID,
Time timeout);
@@ -128,7 +129,7 @@ public interface TaskManagerGateway {
* @param timeout of the submit operation
* @return Future acknowledge if the partitions have been successfully updated
*/
- Future<Acknowledge> updatePartitions(
+ CompletableFuture<Acknowledge> updatePartitions(
ExecutionAttemptID executionAttemptID,
Iterable<PartitionInfo> partitionInfos,
Time timeout);
@@ -176,7 +177,7 @@ public interface TaskManagerGateway {
* @param timeout for the request
* @return Future blob key under which the task manager log has been stored
*/
- Future<BlobKey> requestTaskManagerLog(final Time timeout);
+ CompletableFuture<BlobKey> requestTaskManagerLog(final Time timeout);
/**
* Request the task manager stdout from the task manager.
@@ -184,5 +185,5 @@ public interface TaskManagerGateway {
* @param timeout for the request
* @return Future blob key under which the task manager stdout file has been stored
*/
- Future<BlobKey> requestTaskManagerStdout(final Time timeout);
+ CompletableFuture<BlobKey> requestTaskManagerStdout(final Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7922baa..e2e117a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -36,10 +36,6 @@ import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -105,8 +101,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -684,7 +680,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
- public Future<Iterable<SlotOffer>> offerSlots(
+ public CompletableFuture<Iterable<SlotOffer>> offerSlots(
final ResourceID taskManagerId,
final Iterable<SlotOffer> slots,
final UUID leaderId) throws Exception {
@@ -736,7 +732,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
- public Future<RegistrationResponse> registerTaskManager(
+ public CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
final TaskManagerLocation taskManagerLocation,
final UUID leaderId) throws Exception
@@ -755,48 +751,42 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse response = new JMTMRegistrationSuccess(
resourceId, libraryCacheManager.getBlobServerPort());
- return FlinkCompletableFuture.completed(response);
+ return CompletableFuture.completedFuture(response);
} else {
- return getRpcService().execute(new Callable<TaskExecutorGateway>() {
- @Override
- public TaskExecutorGateway call() throws Exception {
- return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
- .get(rpcTimeout.getSize(), rpcTimeout.getUnit());
- }
- }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
- @Override
- public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
- if (throwable != null) {
- return new RegistrationResponse.Decline(throwable.getMessage());
- }
-
- if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
- log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
- "leader session ID {} did not equal the received leader session ID {}.",
- taskManagerId, taskManagerRpcAddress,
- JobMaster.this.leaderSessionID, leaderId);
- return new RegistrationResponse.Decline("Invalid leader session id");
- }
-
- slotPoolGateway.registerTaskManager(taskManagerId);
- registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
-
- // monitor the task manager as heartbeat target
- taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
- @Override
- public void receiveHeartbeat(ResourceID resourceID, Void payload) {
- // the task manager will not request heartbeat, so this method will never be called currently
+ return getRpcService()
+ .connect(taskManagerRpcAddress, TaskExecutorGateway.class)
+ .handleAsync(
+ (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
+ if (throwable != null) {
+ return new RegistrationResponse.Decline(throwable.getMessage());
}
- @Override
- public void requestHeartbeat(ResourceID resourceID, Void payload) {
- taskExecutorGateway.heartbeatFromJobManager(resourceID);
+ if (!Objects.equals(leaderSessionID, leaderId)) {
+ log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+ "leader session ID {} did not equal the received leader session ID {}.",
+ taskManagerId, taskManagerRpcAddress, leaderSessionID, leaderId);
+ return new RegistrationResponse.Decline("Invalid leader session id");
}
- });
- return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
- }
- }, getMainThreadExecutor());
+ slotPoolGateway.registerTaskManager(taskManagerId);
+ registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
+
+ // monitor the task manager as heartbeat target
+ taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
+ @Override
+ public void receiveHeartbeat(ResourceID resourceID, Void payload) {
+ // the task manager will not request heartbeat, so this method will never be called currently
+ }
+
+ @Override
+ public void requestHeartbeat(ResourceID resourceID, Void payload) {
+ taskExecutorGateway.heartbeatFromJobManager(resourceID);
+ }
+ });
+
+ return new JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
+ },
+ getMainThreadExecutor());
}
}
@@ -1051,14 +1041,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
{
Time timeout = Time.milliseconds(timeoutMillis);
- return FutureUtils.toJava(
- gateway.registerJobManager(
- leaderId,
- jobManagerLeaderID,
- jobManagerResourceID,
- jobManagerRpcAddress,
- jobID,
- timeout));
+ return gateway.registerJobManager(
+ leaderId,
+ jobManagerLeaderID,
+ jobManagerResourceID,
+ jobManagerRpcAddress,
+ jobID,
+ timeout);
}
};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 5a271f9..e3611a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -45,6 +44,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* {@link JobMaster} rpc gateway interface
@@ -68,7 +68,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param taskExecutionState New task execution state for a given task
* @return Future flag of the task execution state update result
*/
- Future<Acknowledge> updateTaskExecutionState(
+ CompletableFuture<Acknowledge> updateTaskExecutionState(
final UUID leaderSessionID,
final TaskExecutionState taskExecutionState);
@@ -81,7 +81,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param executionAttempt The execution attempt id
* @return The future of the input split. If there is no further input split, will return an empty object.
*/
- Future<SerializedInputSplit> requestNextInputSplit(
+ CompletableFuture<SerializedInputSplit> requestNextInputSplit(
final UUID leaderSessionID,
final JobVertexID vertexID,
final ExecutionAttemptID executionAttempt);
@@ -95,7 +95,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param partitionId The partition ID of the partition to request the state of.
* @return The future of the partition state
*/
- Future<ExecutionState> requestPartitionState(
+ CompletableFuture<ExecutionState> requestPartitionState(
final UUID leaderSessionID,
final IntermediateDataSetID intermediateResultId,
final ResultPartitionID partitionId);
@@ -114,7 +114,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param timeout before the rpc call fails
* @return Future acknowledge of the schedule or update operation
*/
- Future<Acknowledge> scheduleOrUpdateConsumers(
+ CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
final UUID leaderSessionID,
final ResultPartitionID partitionID,
@RpcTimeout final Time timeout);
@@ -146,7 +146,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param registrationName Name under which the KvState has been registered.
* @return Future of the requested {@link InternalKvState} location
*/
- Future<KvStateLocation> lookupKvStateLocation(final String registrationName);
+ CompletableFuture<KvStateLocation> lookupKvStateLocation(final String registrationName);
/**
* @param jobVertexId JobVertexID the KvState instance belongs to.
@@ -175,7 +175,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
/**
* Request the classloading props of this job.
*/
- Future<ClassloadingProps> requestClassloadingProps();
+ CompletableFuture<ClassloadingProps> requestClassloadingProps();
/**
* Offer the given slots to the job manager. The response contains the set of accepted slots.
@@ -186,7 +186,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param timeout for the rpc call
* @return Future set of accepted slots.
*/
- Future<Iterable<SlotOffer>> offerSlots(
+ CompletableFuture<Iterable<SlotOffer>> offerSlots(
final ResourceID taskManagerId,
final Iterable<SlotOffer> slots,
final UUID leaderId,
@@ -214,7 +214,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
* @param timeout for the rpc call
* @return Future registration response indicating whether the registration was successful or not
*/
- Future<RegistrationResponse> registerTaskManager(
+ CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
final TaskManagerLocation taskManagerLocation,
final UUID leaderId,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 28fef27..e93c907 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.util.Preconditions;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* Implementation of the {@link TaskManagerGateway} for Flink's RPC system
@@ -68,47 +68,40 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
}
@Override
- public Future<StackTrace> requestStackTrace(Time timeout) {
+ public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
// return taskExecutorGateway.requestStackTrace(timeout);
throw new UnsupportedOperationException("Operation is not yet supported.");
}
@Override
- public Future<StackTraceSampleResponse> requestStackTraceSample(
+ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
ExecutionAttemptID executionAttemptID,
int sampleId,
int numSamples,
Time delayBetweenSamples,
int maxStackTraceDepth,
Time timeout) {
-// return taskExecutorGateway.requestStackTraceSample(
-// executionAttemptID,
-// sampleId,
-// numSamples,
-// delayBetweenSamples,
-// maxStackTraceDepth,
-// timeout);
throw new UnsupportedOperationException("Operation is not yet supported.");
}
@Override
- public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+ public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
return taskExecutorGateway.submitTask(tdd, leaderId, timeout);
}
@Override
- public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
return taskExecutorGateway.stopTask(executionAttemptID, timeout);
}
@Override
- public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
return taskExecutorGateway.cancelTask(executionAttemptID, timeout);
}
@Override
- public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
return taskExecutorGateway.updatePartitions(executionAttemptID, partitionInfos, timeout);
}
@@ -130,13 +123,13 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
}
@Override
- public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+ public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
// return taskExecutorGateway.requestTaskManagerLog(timeout);
throw new UnsupportedOperationException("Operation is not yet supported.");
}
@Override
- public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+ public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
// return taskExecutorGateway.requestTaskManagerStdout(timeout);
throw new UnsupportedOperationException("Operation is not yet supported.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 1034a89..6a18ffd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.registration;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
@@ -176,8 +175,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
public void startRegistration() {
try {
// trigger resolution of the resource manager address to a callable gateway
- CompletableFuture<Gateway> resourceManagerFuture = FutureUtils.toJava(
- rpcService.connect(targetAddress, targetType));
+ CompletableFuture<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
// upon success, start the registration attempts
CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index 8bffcd0..aaa72d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -20,10 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -37,7 +34,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -180,7 +178,7 @@ public class JobLeaderIdService {
return jobLeaderIdListeners.containsKey(jobId);
}
- public Future<UUID> getLeaderId(JobID jobId) throws Exception {
+ public CompletableFuture<UUID> getLeaderId(JobID jobId) throws Exception {
if (!jobLeaderIdListeners.containsKey(jobId)) {
addJob(jobId);
}
@@ -235,7 +233,7 @@ public class JobLeaderIdService {
this.listenerJobLeaderIdActions = Preconditions.checkNotNull(listenerJobLeaderIdActions);
this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
- leaderIdFuture = new FlinkCompletableFuture<>();
+ leaderIdFuture = new CompletableFuture<>();
activateTimeout();
@@ -243,7 +241,7 @@ public class JobLeaderIdService {
leaderRetrievalService.start(this);
}
- public Future<UUID> getLeaderIdFuture() {
+ public CompletableFuture<UUID> getLeaderIdFuture() {
return leaderIdFuture;
}
@@ -269,12 +267,12 @@ public class JobLeaderIdService {
if (leaderIdFuture.isDone()) {
try {
previousJobLeaderId = leaderIdFuture.getNow(null);
- } catch (ExecutionException e) {
+ } catch (CompletionException e) {
// this should never happen since we complete this future always properly
handleError(e);
}
- leaderIdFuture = FlinkCompletableFuture.completed(leaderSessionId);
+ leaderIdFuture = CompletableFuture.completedFuture(leaderSessionId);
} else {
leaderIdFuture.complete(leaderSessionId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5e33c0e..8318c09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
@@ -246,7 +245,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
// ------------------------------------------------------------------------
@RpcMethod
- public Future<RegistrationResponse> registerJobManager(
+ public CompletableFuture<RegistrationResponse> registerJobManager(
final UUID resourceManagerLeaderId,
final UUID jobManagerLeaderId,
final ResourceID jobManagerResourceId,
@@ -270,13 +269,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
onFatalErrorAsync(exception);
log.error("Could not add job {} to job leader id service.", jobId, e);
- return FutureUtils.toFlinkFuture(FutureUtils.completedExceptionally(exception));
+ return FutureUtils.completedExceptionally(exception);
}
}
log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId);
- Future<UUID> jobLeaderIdFuture;
+ CompletableFuture<UUID> jobLeaderIdFuture;
try {
jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
@@ -289,12 +288,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
onFatalErrorAsync(exception);
log.debug("Could not obtain the job leader id future to verify the correct job leader.");
- return FutureUtils.toFlinkFuture(FutureUtils.completedExceptionally(exception));
+ return FutureUtils.completedExceptionally(exception);
}
- Future<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
+ CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
- Future<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
+ CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
jobLeaderIdFuture,
(JobMasterGateway jobMasterGateway, UUID jobLeaderId) -> {
if (isValid(resourceManagerLeaderId)) {
@@ -339,8 +338,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
"{} did not match the expected leader id {}.", jobManagerAddress,
resourceManagerLeaderId, leaderSessionId);
- return FutureUtils.toFlinkFuture(CompletableFuture.<RegistrationResponse>completedFuture(
- new RegistrationResponse.Decline("Resource manager leader id did not match.")));
+ return CompletableFuture.completedFuture(
+ new RegistrationResponse.Decline("Resource manager leader id did not match."));
}
}
@@ -354,14 +353,14 @@ public abstract class ResourceManager<WorkerType extends Serializable>
* @return The response by the ResourceManager.
*/
@RpcMethod
- public Future<RegistrationResponse> registerTaskExecutor(
+ public CompletableFuture<RegistrationResponse> registerTaskExecutor(
final UUID resourceManagerLeaderId,
final String taskExecutorAddress,
final ResourceID taskExecutorResourceId,
final SlotReport slotReport) {
if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
- Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+ CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
return taskExecutorGatewayFuture.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
@@ -381,10 +380,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
"not equal the received leader session ID {}",
taskExecutorResourceId, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);
- return FutureUtils.toFlinkFuture(CompletableFuture.<RegistrationResponse>completedFuture(
+ return CompletableFuture.completedFuture(
new RegistrationResponse.Decline("Discard registration because the leader id " +
resourceManagerLeaderId + " does not match the expected leader id " +
- leaderSessionId + '.')));
+ leaderSessionId + '.'));
}
}
@@ -493,8 +492,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
if(infoMessageListeners.containsKey(address)) {
log.warn("Receive a duplicate registration from info message listener on ({})", address);
} else {
- CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = FutureUtils.toJava(
- getRpcService().connect(address, InfoMessageListenerRpcGateway.class));
+ CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService()
+ .connect(address, InfoMessageListenerRpcGateway.class);
infoMessageListenerRpcGatewayFuture.whenCompleteAsync(
(InfoMessageListenerRpcGateway gateway, Throwable failure) -> {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index eb091c4..1ba6893 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* The {@link ResourceManager}'s RPC gateway interface.
@@ -51,7 +51,7 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param timeout Timeout for the future to complete
* @return Future registration response
*/
- Future<RegistrationResponse> registerJobManager(
+ CompletableFuture<RegistrationResponse> registerJobManager(
UUID resourceManagerLeaderId,
UUID jobMasterLeaderId,
ResourceID jobMasterResourceId,
@@ -67,7 +67,7 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param slotRequest The slot to request
* @return The confirmation that the slot gets allocated
*/
- Future<Acknowledge> requestSlot(
+ CompletableFuture<Acknowledge> requestSlot(
UUID resourceManagerLeaderID,
UUID jobMasterLeaderID,
SlotRequest slotRequest,
@@ -84,7 +84,7 @@ public interface ResourceManagerGateway extends RpcGateway {
*
* @return The future to the response by the ResourceManager.
*/
- Future<RegistrationResponse> registerTaskExecutor(
+ CompletableFuture<RegistrationResponse> registerTaskExecutor(
UUID resourceManagerLeaderId,
String taskExecutorAddress,
ResourceID resourceID,
@@ -133,7 +133,7 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param leaderSessionId The leader session ID with which to address the ResourceManager.
* @return The future to the number of registered TaskManagers.
*/
- Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
+ CompletableFuture<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
/**
* Sends the heartbeat to resource manager from task manager
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 8354525..3bda409 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -639,14 +638,13 @@ public class SlotManager implements AutoCloseable {
}
// RPC call to the task manager
- CompletableFuture<Acknowledge> requestFuture = FutureUtils.toJava(
- gateway.requestSlot(
- slotId,
- pendingSlotRequest.getJobId(),
- allocationId,
- pendingSlotRequest.getTargetAddress(),
- leaderId,
- taskManagerRequestTimeout));
+ CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
+ slotId,
+ pendingSlotRequest.getJobId(),
+ allocationId,
+ pendingSlotRequest.getTargetAddress(),
+ leaderId,
+ taskManagerRequestTimeout);
requestFuture.whenComplete(
(Acknowledge acknowledge, Throwable throwable) -> {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
index ec1c984..6e36bd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
/**
@@ -51,7 +51,7 @@ public interface MainThreadExecutable {
* @param <V> Return value of the callable
* @return Future of the callable result
*/
- <V> Future<V> callAsync(Callable<V> callable, Time callTimeout);
+ <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout);
/**
* Execute the runnable in the main thread of the underlying RPC endpoint, with
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 331f3a3..b5bbc2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ReflectionUtil;
import org.slf4j.Logger;
@@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -214,7 +214,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
*
* @return Future which is completed when the rpc endpoint has been terminated.
*/
- public Future<Void> getTerminationFuture() {
+ public CompletableFuture<Void> getTerminationFuture() {
return ((SelfGateway)self).getTerminationFuture();
}
@@ -264,7 +264,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
* @param <V> Return type of the callable
* @return Future for the result of the callable.
*/
- protected <V> Future<V> callAsync(Callable<V> callable, Time timeout) {
+ protected <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout) {
return ((MainThreadExecutable) self).callAsync(callable, timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 51b7ca2..a92f3e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,11 +18,11 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -61,7 +61,7 @@ public interface RpcService {
* @return Future containing the rpc gateway or an {@link RpcConnectionException} if the
* connection attempt failed
*/
- <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
+ <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
/**
* Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
@@ -91,7 +91,7 @@ public interface RpcService {
*
* @return Termination future
*/
- Future<Void> getTerminationFuture();
+ CompletableFuture<Void> getTerminationFuture();
/**
* Gets the executor, provided by this RPC service. This executor can be used for example for
@@ -145,7 +145,7 @@ public interface RpcService {
void execute(Runnable runnable);
/**
- * Execute the given callable and return its result as a {@link Future}. This method can be used
+ * Execute the given callable and return its result as a {@link CompletableFuture}. This method can be used
* to run code outside of the main thread of a {@link RpcEndpoint}.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
@@ -158,5 +158,5 @@ public interface RpcService {
* @param <T> is the return value type
* @return Future containing the callable's future result
*/
- <T> Future<T> execute(Callable<T> callable);
+ <T> CompletableFuture<T> execute(Callable<T> callable);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
index ed8ef9d..d39b1ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/SelfGateway.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.runtime.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
/**
* Interface for self gateways
@@ -30,5 +30,5 @@ public interface SelfGateway {
*
* @return Future indicating when the rpc endpoint has been terminated
*/
- Future<Void> getTerminationFuture();
+ CompletableFuture<Void> getTerminationFuture();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index c21accf..ae6b832 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -22,8 +22,7 @@ import akka.actor.ActorRef;
import akka.pattern.Patterns;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -44,7 +43,9 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.BitSet;
+import java.util.Objects;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -79,7 +80,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
private final long maximumFramesize;
// null if gateway; otherwise non-null
- private final Future<Void> terminationFuture;
+ private final CompletableFuture<Void> terminationFuture;
AkkaInvocationHandler(
String address,
@@ -87,7 +88,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
- Future<Void> terminationFuture) {
+ CompletableFuture<Void> terminationFuture) {
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
@@ -146,20 +147,19 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
Class<?> returnType = method.getReturnType();
- if (returnType.equals(Void.TYPE)) {
+ if (Objects.equals(returnType, Void.TYPE)) {
rpcEndpoint.tell(rpcInvocation, ActorRef.noSender());
result = null;
- } else if (returnType.equals(Future.class)) {
+ } else if (Objects.equals(returnType,CompletableFuture.class)) {
// execute an asynchronous call
- result = new FlinkFuture<>(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
+ result = FutureUtils.toJava(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
} else {
// execute a synchronous call
- scala.concurrent.Future<?> scalaFuture = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds());
+ CompletableFuture<?> futureResult = FutureUtils.toJava(
+ Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
- Future<?> futureResult = new FlinkFuture<>(scalaFuture);
-
- return futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
+ result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
}
}
@@ -191,12 +191,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
}
@Override
- public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
+ public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
if(isLocal) {
@SuppressWarnings("unchecked")
- scala.concurrent.Future<V> result = (scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds());
+ scala.concurrent.Future<V> resultFuture = (scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds());
- return new FlinkFuture<>(result);
+ return FutureUtils.toJava(resultFuture);
} else {
throw new RuntimeException("Trying to send a Callable to a remote actor at " +
rpcEndpoint.path() + ". This is not supported.");
@@ -331,7 +331,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea
}
@Override
- public Future<Void> getTerminationFuture() {
+ public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index fe3fcc9..5845473 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -21,12 +21,8 @@ package org.apache.flink.runtime.rpc.akka;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.UntypedActor;
-import akka.dispatch.Futures;
import akka.japi.Procedure;
import akka.pattern.Patterns;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -44,11 +40,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -209,24 +207,20 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp
return;
}
- if (result instanceof Future) {
- final Future<?> future = (Future<?>) result;
-
- // pipe result to sender
- if (future instanceof FlinkFuture) {
- // FlinkFutures are currently backed by Scala's futures
- FlinkFuture<?> flinkFuture = (FlinkFuture<?>) future;
-
- Patterns.pipe(flinkFuture.getScalaFuture(), getContext().dispatcher()).to(getSender());
- } else {
- // We have to unpack the Flink future and pack it into a Scala future
- Patterns.pipe(Futures.future(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- return future.get();
+ if (result instanceof CompletableFuture) {
+ final CompletableFuture<?> future = (CompletableFuture<?>) result;
+ Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();
+
+ future.whenComplete(
+ (value, throwable) -> {
+ if (throwable != null) {
+ promise.failure(throwable);
+ } else {
+ promise.success(value);
}
- }, getContext().dispatcher()), getContext().dispatcher());
- }
+ });
+
+ Patterns.pipe(promise.future(), getContext().dispatcher()).to(getSender());
} else {
// tell the sender the result of the computation
getSender().tell(new Status.Success(result), getSelf());
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 2f02e8c..80267f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -31,11 +31,8 @@ import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -57,6 +54,7 @@ import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
@@ -133,7 +131,7 @@ public class AkkaRpcService implements RpcService {
// this method does not mutate state and is thus thread-safe
@Override
- public <C extends RpcGateway> Future<C> connect(final String address, final Class<C> clazz) {
+ public <C extends RpcGateway> CompletableFuture<C> connect(final String address, final Class<C> clazz) {
checkState(!stopped, "RpcService is stopped");
LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
@@ -186,14 +184,14 @@ public class AkkaRpcService implements RpcService {
}
}, actorSystem.dispatcher());
- return new FlinkFuture<>(resultFuture);
+ return FutureUtils.toJava(resultFuture);
}
@Override
public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");
- CompletableFuture<Void> terminationFuture = new FlinkCompletableFuture<>();
+ CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
ActorRef actorRef;
@@ -283,14 +281,10 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public Future<Void> getTerminationFuture() {
- return FlinkFuture.supplyAsync(new Callable<Void>(){
- @Override
- public Void call() throws Exception {
- actorSystem.awaitTermination();
- return null;
- }
- }, getExecutor());
+ public CompletableFuture<Void> getTerminationFuture() {
+ return CompletableFuture.runAsync(
+ actorSystem::awaitTermination,
+ getExecutor());
}
@Override
@@ -317,10 +311,10 @@ public class AkkaRpcService implements RpcService {
}
@Override
- public <T> Future<T> execute(Callable<T> callable) {
+ public <T> CompletableFuture<T> execute(Callable<T> callable) {
scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher());
- return new FlinkFuture<>(scalaFuture);
+ return FutureUtils.toJava(scalaFuture);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 71933fe..2ebf3c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -379,8 +378,8 @@ public class JobLeaderService {
protected CompletableFuture<RegistrationResponse> invokeRegistration(
JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
{
- return FutureUtils.toJava(gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
- leaderId, Time.milliseconds(timeoutMillis)));
+ return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
+ leaderId, Time.milliseconds(timeoutMillis));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index aa4d6d2..effa498 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -768,12 +767,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
reservedSlots.add(offer);
}
- CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = FutureUtils.toJava(
- jobMasterGateway.offerSlots(
- getResourceID(),
- reservedSlots,
- leaderId,
- taskManagerConfiguration.getTimeout()));
+ CompletableFuture<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
+ getResourceID(),
+ reservedSlots,
+ leaderId,
+ taskManagerConfiguration.getTimeout());
acceptedSlotsFuture.whenCompleteAsync(
(Iterable<SlotOffer> acceptedSlots, Throwable throwable) -> {
@@ -985,8 +983,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
{
final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
- CompletableFuture<Acknowledge> futureAcknowledge = FutureUtils.toJava(
- jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState));
+ CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState);
futureAcknowledge.whenCompleteAsync(
(ack, throwable) -> {
@@ -1348,10 +1345,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
@Override
public CompletableFuture<SlotReport> retrievePayload() {
- return FutureUtils.toJava(
- callAsync(
+ return callAsync(
() -> taskSlotTable.createSlotReport(getResourceID()),
- taskManagerConfiguration.getTimeout()));
+ taskManagerConfiguration.getTimeout());
}
}
}
[2/3] flink git commit: [FLINK-7334] [futures] Replace Flink's
futures with Java 8's CompletableFuture in RpcEndpoint,
RpcGateways and RpcService
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d4afdbd..8084154 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskmanager.Task;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* {@link TaskExecutor} RPC gateway interface
@@ -48,7 +48,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param resourceManagerLeaderId current leader id of the ResourceManager
* @return answer to the slot request
*/
- Future<Acknowledge> requestSlot(
+ CompletableFuture<Acknowledge> requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
@@ -64,7 +64,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout of the submit operation
* @return Future acknowledge of the successful operation
*/
- Future<Acknowledge> submitTask(
+ CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
UUID leaderId,
@RpcTimeout Time timeout);
@@ -77,7 +77,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout for the update partitions operation
* @return Future acknowledge if the partitions have been successfully updated
*/
- Future<Acknowledge> updatePartitions(
+ CompletableFuture<Acknowledge> updatePartitions(
ExecutionAttemptID executionAttemptID,
Iterable<PartitionInfo> partitionInfos,
@RpcTimeout Time timeout);
@@ -99,7 +99,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param checkpointOptions for performing the checkpoint
* @return Future acknowledge if the checkpoint has been successfully triggered
*/
- Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
+ CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
/**
* Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
@@ -110,7 +110,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
* @return Future acknowledge if the checkpoint has been successfully confirmed
*/
- Future<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
+ CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
/**
* Stop the given task.
@@ -119,7 +119,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout for the stop operation
* @return Future acknowledge if the task is successfully stopped
*/
- Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+ CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
/**
* Cancel the given task.
@@ -128,7 +128,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout for the cancel operation
* @return Future acknowledge if the task is successfully canceled
*/
- Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+ CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
/**
* Heartbeat request from the job manager
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 4f91166..4084d67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -156,7 +155,7 @@ public class TaskExecutorToResourceManagerConnection
ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
- return FutureUtils.toJava(resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout));
+ return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 78b49ef..b077b76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -52,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -162,7 +162,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
}
// export the termination future for caller to know it is terminated
- public Future<Void> getTerminationFuture() {
+ public CompletableFuture<Void> getTerminationFuture() {
return taskManager.getTerminationFuture();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 3b9da48..a919c78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -32,6 +31,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
public class RpcInputSplitProvider implements InputSplitProvider {
private final UUID jobMasterLeaderId;
@@ -61,7 +61,7 @@ public class RpcInputSplitProvider implements InputSplitProvider {
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);
- Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
+ CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
jobMasterLeaderId, jobVertexID, executionAttemptID);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
index 07d04e6..26e1b0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.taskexecutor.rpc;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -46,6 +45,6 @@ public class RpcPartitionStateChecker implements PartitionProducerStateChecker {
IntermediateDataSetID resultId,
ResultPartitionID partitionId) {
- return FutureUtils.toJava(jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId));
+ return jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
index cf01d5a..d898562 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -32,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class RpcResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
@@ -55,18 +54,17 @@ public class RpcResultPartitionConsumableNotifier implements ResultPartitionCons
}
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
- Future<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
+ CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
jobMasterLeaderId, partitionId, timeout);
- acknowledgeFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable value) {
- LOG.error("Could not schedule or update consumers at the JobManager.", value);
+ acknowledgeFuture.whenCompleteAsync(
+ (Acknowledge ack, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
- taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", value));
-
- return null;
- }
- }, executor);
+ taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a6712ad..ef4fa86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
import java.net._
import java.util.UUID
import java.util.concurrent.{Future => JavaFuture, _}
-import java.util.function.BiFunction
+import java.util.function.{BiFunction, Consumer}
import akka.actor.Status.{Failure, Success}
import akka.actor._
@@ -1105,17 +1105,18 @@ class JobManager(
val originalSender = new AkkaActorGateway(sender(), leaderSessionID.orNull)
- val sendingFuture = stackTraceFuture.thenAccept(new AcceptFunction[StackTrace] {
- override def accept(value: StackTrace): Unit = {
- originalSender.tell(value)
- }
- })
+ val sendingFuture = stackTraceFuture.thenAccept(
+ new Consumer[StackTrace]() {
+ override def accept(value: StackTrace): Unit = {
+ originalSender.tell(value)
+ }
+ })
- sendingFuture.exceptionally(new ApplyFunction[Throwable, Void] {
+ sendingFuture.exceptionally(new java.util.function.Function[Throwable, Void] {
override def apply(value: Throwable): Void = {
log.info("Could not send requested stack trace.", value)
- return null
+ null
}
})
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 09e829e..37a4547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.akka;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -43,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -125,7 +123,7 @@ public class QuarantineMonitorTest extends TestLogger {
// start watching the watchee
watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
- Future<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
+ CompletableFuture<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
} finally {
@@ -166,7 +164,7 @@ public class QuarantineMonitorTest extends TestLogger {
// start watching the watchee
watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
- Future<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
+ CompletableFuture<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
} finally {
@@ -182,8 +180,8 @@ public class QuarantineMonitorTest extends TestLogger {
private final CompletableFuture<String> hasQuarantinedFuture;
public TestingQuarantineHandler() {
- this.wasQuarantinedByFuture = new FlinkCompletableFuture<>();
- this.hasQuarantinedFuture = new FlinkCompletableFuture<>();
+ this.wasQuarantinedByFuture = new CompletableFuture<>();
+ this.hasQuarantinedFuture = new CompletableFuture<>();
}
@Override
@@ -196,11 +194,11 @@ public class QuarantineMonitorTest extends TestLogger {
hasQuarantinedFuture.complete(remoteSystem);
}
- public Future<String> getWasQuarantinedByFuture() {
+ public CompletableFuture<String> getWasQuarantinedByFuture() {
return wasQuarantinedByFuture;
}
- public Future<String> getHasQuarantinedFuture() {
+ public CompletableFuture<String> getHasQuarantinedFuture() {
return hasQuarantinedFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 5df5c58..e23f6a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -339,10 +338,10 @@ public class CheckpointCoordinatorMasterHooksTest {
final MasterTriggerRestoreHook<Void> hook = mockGeneric(MasterTriggerRestoreHook.class);
when(hook.getIdentifier()).thenReturn(id);
when(hook.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class))).thenAnswer(
- new Answer<Future<Void>>() {
+ new Answer<CompletableFuture<Void>>() {
@Override
- public Future<Void> answer(InvocationOnMock invocation) throws Throwable {
+ public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
assertEquals(1, cc.getNumberOfPendingCheckpoints());
long checkpointId = (Long) invocation.getArguments()[0];
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index f1bc43b..e1144c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -71,6 +70,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -524,7 +524,7 @@ public class ResourceManagerTest extends TestLogger {
final SlotReport slotReport = new SlotReport();
// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
- Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
+ CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
rmLeaderSessionId,
taskManagerAddress,
taskManagerResourceID,
@@ -622,7 +622,7 @@ public class ResourceManagerTest extends TestLogger {
rmLeaderElectionService.isLeader(rmLeaderId);
// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
- Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
rmLeaderId,
jmLeaderId,
jmResourceId,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a7a86c3..267f10b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
@@ -42,6 +41,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.concurrent.CompletableFuture;
+
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -86,7 +87,7 @@ public class DispatcherTest extends TestLogger {
DispatcherGateway dispatcherGateway = dispatcher.getSelf();
- Future<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+ CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
acknowledgeFuture.get();
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index bfcab87..f4e8b30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -120,7 +119,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
when(rootSlot.getSlotNumber()).thenReturn(0);
- when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index d3086a8..b88a928 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -576,7 +575,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
private static TaskManagerGateway createTaskManager() {
TaskManagerGateway tm = mock(TaskManagerGateway.class);
when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ .thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
return tm;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index effe417..de9081b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -37,6 +36,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.util.concurrent.CompletableFuture;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -155,9 +156,9 @@ public class ExecutionGraphStopTest extends TestLogger {
final TaskManagerGateway gateway = mock(TaskManagerGateway.class);
when(gateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ .thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ .thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
index f453d20..be68532 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -19,14 +19,15 @@
package org.apache.flink.runtime.executiongraph.utils;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
+import java.util.concurrent.CompletableFuture;
+
public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
@Override
- public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return new FlinkCompletableFuture<>();
+ public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return new CompletableFuture<>();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 6e67c1a..b968d39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.messages.StackTrace;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* A TaskManagerGateway that simply acks the basic operations (deploy, cancel, update) and does not
@@ -56,39 +56,39 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
public void stopCluster(ApplicationStatus applicationStatus, String message) {}
@Override
- public Future<StackTrace> requestStackTrace(Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
@Override
- public Future<StackTraceSampleResponse> requestStackTraceSample(
+ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
ExecutionAttemptID executionAttemptID,
int sampleId,
int numSamples,
Time delayBetweenSamples,
int maxStackTraceDepth,
Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
@Override
- public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
- public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
- public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
- public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
@@ -110,12 +110,12 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
CheckpointOptions checkpointOptions) {}
@Override
- public Future<BlobKey> requestTaskManagerLog(Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
@Override
- public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index b444640..4cc4f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.rpc.RpcService;
@@ -35,6 +34,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -82,7 +82,7 @@ public class SlotPoolRpcTest {
);
pool.start(UUID.randomUUID(), "foobar");
- Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
try {
future.get(4, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index cf95461..3e2293b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -41,6 +40,7 @@ import org.mockito.ArgumentCaptor;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
@@ -85,7 +85,7 @@ public class SlotPoolTest extends TestLogger {
this.resourceManagerGateway = mock(ResourceManagerGateway.class);
when(resourceManagerGateway
.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
- .thenReturn(mock(Future.class, RETURNS_MOCKS));
+ .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway);
}
@@ -101,7 +101,7 @@ public class SlotPoolTest extends TestLogger {
slotPool.registerTaskManager(resourceID);
ScheduledUnit task = mock(ScheduledUnit.class);
- Future<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -126,8 +126,8 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
- Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
assertFalse(future1.isDone());
assertFalse(future2.isDone());
@@ -165,7 +165,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
assertFalse(future1.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -182,7 +182,7 @@ public class SlotPoolTest extends TestLogger {
// return this slot to pool
slot1.releaseSlot();
- Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
// second allocation fulfilled by previous slot returning
SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -200,7 +200,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -241,14 +241,14 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
- Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPool.offerSlot(allocatedSlot));
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 0b25e6c..48a1d45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -34,7 +33,6 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -50,6 +48,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.net.InetAddress;
import java.net.URL;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -177,7 +176,7 @@ public class JobMasterTest extends TestLogger {
anyString(),
any(JobID.class),
any(Time.class)
- )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JobMasterRegistrationSuccess(
+ )).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess(
heartbeatInterval, rmLeaderId, rmResourceId)));
final TestingSerialRpcService rpc = new TestingSerialRpcService();
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 9a4917a..da992bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.registration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.util.TestLogger;
@@ -123,8 +122,8 @@ public class RetryingRegistrationTest extends TestLogger {
// RPC service that fails upon the first connection, but succeeds on the second
RpcService rpc = mock(RpcService.class);
when(rpc.connect(anyString(), any(Class.class))).thenReturn(
- FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")), // first connection attempt fails
- FlinkCompletableFuture.completed(testGateway) // second connection attempt succeeds
+ FutureUtils.completedExceptionally(new Exception("test connect failure")), // first connection attempt fails
+ CompletableFuture.completedFuture(testGateway) // second connection attempt succeeds
);
when(rpc.getExecutor()).thenReturn(executor);
@@ -245,8 +244,8 @@ public class RetryingRegistrationTest extends TestLogger {
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
- FlinkCompletableFuture.<RegistrationResponse>completedExceptionally(new Exception("test exception")),
- FlinkCompletableFuture.<RegistrationResponse>completed(new TestRegistrationSuccess(testId)));
+ FutureUtils.completedExceptionally(new Exception("test exception")),
+ CompletableFuture.completedFuture(new TestRegistrationSuccess(testId)));
rpc.registerGateway(testEndpointAddress, testGateway);
@@ -281,7 +280,7 @@ public class RetryingRegistrationTest extends TestLogger {
TestingRpcService rpc = new TestingRpcService();
try {
- FlinkCompletableFuture<RegistrationResponse> result = new FlinkCompletableFuture<>();
+ CompletableFuture<RegistrationResponse> result = new CompletableFuture<>();
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result);
@@ -340,7 +339,7 @@ public class RetryingRegistrationTest extends TestLogger {
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
- return FutureUtils.toJava(gateway.registrationCall(leaderId, timeoutMillis));
+ return gateway.registrationCall(leaderId, timeoutMillis);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 1b23fa3..4cfbc12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -18,13 +18,12 @@
package org.apache.flink.runtime.registration;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.TestingGatewayBase;
import org.apache.flink.util.Preconditions;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -47,7 +46,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
// ------------------------------------------------------------------------
- public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
+ public CompletableFuture<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
invocations.add(new RegistrationCall(leaderId, timeout));
RegistrationResponse response = responses[pos];
@@ -56,7 +55,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
}
// return a completed future (for a proper value), or one that never completes and will time out (for null)
- return response != null ? FlinkCompletableFuture.completed(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+ return response != null ? CompletableFuture.completedFuture(response) : futureWithTimeout(timeout);
}
public BlockingQueue<RegistrationCall> getInvocations() {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index 4d5964e..7b8703e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -34,6 +33,7 @@ import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -83,7 +83,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
jobLeaderIdService.addJob(jobId);
- Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+ CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
// notify the leader id service about the new leader
leaderRetrievalService.notifyListener(address, leaderId);
@@ -117,7 +117,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
jobLeaderIdService.addJob(jobId);
- Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+ CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
// remove the job before we could find a leader
jobLeaderIdService.removeJob(jobId);
@@ -228,7 +228,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
jobLeaderIdService.addJob(jobId);
- Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+ CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
// notify the leader id service about the new leader
leaderRetrievalService.notifyListener(address, leaderId);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 4836f74..6480d75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -43,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
@@ -74,11 +74,11 @@ public class ResourceManagerJobMasterTest extends TestLogger {
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
// test response successful
- Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
jmLeaderID,
jmResourceId,
@@ -104,12 +104,12 @@ public class ResourceManagerJobMasterTest extends TestLogger {
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
UUID differentLeaderSessionID = UUID.randomUUID();
- Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
differentLeaderSessionID,
jmLeaderID,
jmResourceId,
@@ -134,14 +134,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
UUID differentLeaderSessionID = UUID.randomUUID();
- Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
differentLeaderSessionID,
jmResourceId,
@@ -166,14 +166,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
// test throw exception when receive a registration from job master which takes invalid address
String invalidAddress = "/jobMasterAddress2";
- Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
jmLeaderSessionId,
jmResourceId,
@@ -198,14 +198,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
JobID unknownJobIDToHAServices = new JobID();
// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
- Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
jmLeaderSessionId,
jmResourceId,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 85b7eb4..4127cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -41,6 +40,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNotEquals;
@@ -89,13 +89,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
public void testRegisterTaskExecutor() throws Exception {
try {
// test response successful
- Future<RegistrationResponse> successfulFuture =
+ CompletableFuture<RegistrationResponse> successfulFuture =
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
- Future<RegistrationResponse> duplicateFuture =
+ CompletableFuture<RegistrationResponse> duplicateFuture =
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
RegistrationResponse duplicateResponse = duplicateFuture.get();
assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
@@ -115,7 +115,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
try {
// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
UUID differentLeaderSessionID = UUID.randomUUID();
- Future<RegistrationResponse> unMatchedLeaderFuture =
+ CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
} finally {
@@ -133,7 +133,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
try {
// test throw exception when receive a registration from taskExecutor which takes invalid address
String invalidAddress = "/taskExecutor2";
- Future<RegistrationResponse> invalidAddressFuture =
+ CompletableFuture<RegistrationResponse> invalidAddressFuture =
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 39c5f25..93e96a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
-import org.apache.flink.runtime.concurrent.*;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -44,7 +44,7 @@ import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -114,7 +114,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+ any(Time.class))).thenReturn(new CompletableFuture<>());
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -241,7 +241,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -280,7 +280,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+ any(Time.class))).thenReturn(new CompletableFuture<>());
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
@@ -338,7 +338,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -482,7 +482,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -527,7 +527,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -747,8 +747,8 @@ public class SlotManagerTest extends TestLogger {
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
- final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
- final FlinkCompletableFuture<Acknowledge> slotRequestFuture2 = new FlinkCompletableFuture<>();
+ final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<>();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
when(taskExecutorGateway.requestSlot(
@@ -826,7 +826,7 @@ public class SlotManagerTest extends TestLogger {
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
- final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
+ final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
when(taskExecutorGateway.requestSlot(
@@ -835,7 +835,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
any(UUID.class),
- any(Time.class))).thenReturn(slotRequestFuture1, FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -856,24 +856,21 @@ public class SlotManagerTest extends TestLogger {
slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- Future<Void> registrationFuture = FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
+ () -> {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
return null;
- }
- }, mainThreadExecutor)
- .thenAccept(new AcceptFunction<Void>() {
- @Override
- public void accept(Void value) {
+ },
+ mainThreadExecutor)
+ .thenAccept(
+ (Object value) -> {
try {
slotManager.registerSlotRequest(slotRequest);
} catch (SlotManagerException e) {
throw new RuntimeException("Could not register slots.", e);
}
- }
- });
+ });
// check that no exception has been thrown
registrationFuture.get();
@@ -891,12 +888,9 @@ public class SlotManagerTest extends TestLogger {
final SlotID requestedSlotId = slotIdCaptor.getValue();
final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
- Future<Boolean> freeSlotFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return slotManager.getSlot(freeSlotId).isFree();
- }
- }, mainThreadExecutor);
+ CompletableFuture<Boolean> freeSlotFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.getSlot(freeSlotId).isFree(),
+ mainThreadExecutor);
assertTrue(freeSlotFuture.get());
@@ -904,15 +898,10 @@ public class SlotManagerTest extends TestLogger {
final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
- FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // this should update the slot with the pending slot request triggering the reassignment of it
- slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
-
- return null;
- }
- }, mainThreadExecutor);
+ CompletableFuture.supplyAsync(
+ // this should update the slot with the pending slot request triggering the reassignment of it
+ () -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport),
+ mainThreadExecutor);
verify(taskExecutorGateway, timeout(verifyTimeout).times(2)).requestSlot(
slotIdCaptor.capture(),
@@ -926,12 +915,9 @@ public class SlotManagerTest extends TestLogger {
assertEquals(slotId2, requestedSlotId2);
- Future<TaskManagerSlot> requestedSlotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
- @Override
- public TaskManagerSlot call() throws Exception {
- return slotManager.getSlot(requestedSlotId2);
- }
- }, mainThreadExecutor);
+ CompletableFuture<TaskManagerSlot> requestedSlotFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.getSlot(requestedSlotId2),
+ mainThreadExecutor);
TaskManagerSlot slot = requestedSlotFuture.get();
@@ -967,7 +953,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -987,20 +973,16 @@ public class SlotManagerTest extends TestLogger {
slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- slotManager.registerSlotRequest(slotRequest);
-
- return null;
- }
- }, mainThreadExecutor)
- .thenAccept(new AcceptFunction<Void>() {
- @Override
- public void accept(Void value) {
- slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
- }
- });
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return slotManager.registerSlotRequest(slotRequest);
+ } catch (SlotManagerException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ mainThreadExecutor)
+ .thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
@@ -1012,44 +994,28 @@ public class SlotManagerTest extends TestLogger {
eq(leaderId),
any(Time.class));
- Future<Boolean> idleFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
- }
- }, mainThreadExecutor);
+ CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
+ mainThreadExecutor);
// check that the TaskManaer is not idle
assertFalse(idleFuture.get());
final SlotID slotId = slotIdArgumentCaptor.getValue();
- Future<TaskManagerSlot> slotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
- @Override
- public TaskManagerSlot call() throws Exception {
- return slotManager.getSlot(slotId);
- }
- }, mainThreadExecutor);
+ CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.getSlot(slotId),
+ mainThreadExecutor);
TaskManagerSlot slot = slotFuture.get();
assertTrue(slot.isAllocated());
assertEquals(allocationId, slot.getAllocationId());
- Future<Boolean> idleFuture2 = FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- slotManager.freeSlot(slotId, allocationId);
-
- return null;
- }
- }, mainThreadExecutor)
- .thenApply(new ApplyFunction<Void, Boolean>() {
- @Override
- public Boolean apply(Void value) {
- return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
- }
- });
+ CompletableFuture<Boolean> idleFuture2 = CompletableFuture.runAsync(
+ () -> slotManager.freeSlot(slotId, allocationId),
+ mainThreadExecutor)
+ .thenApply((Object value) -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()));
assertTrue(idleFuture2.get());
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a1ab1ab..844e159 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
@@ -41,6 +40,7 @@ import org.mockito.Mockito;
import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -104,7 +104,7 @@ public class SlotProtocolTest extends TestLogger {
Mockito.when(
taskExecutorGateway
.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
- .thenReturn(mock(FlinkFuture.class));
+ .thenReturn(mock(CompletableFuture.class));
final ResourceID resourceID = ResourceID.generate();
final SlotID slotID = new SlotID(resourceID, 0);
@@ -139,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
Mockito.when(
taskExecutorGateway
.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
- .thenReturn(mock(FlinkFuture.class));
+ .thenReturn(mock(CompletableFuture.class));
try (SlotManager slotManager = new SlotManager(
scheduledExecutor,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e636d6c..4be5257 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -23,14 +23,13 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@@ -90,9 +89,8 @@ public class AsyncCallsTest extends TestLogger {
});
}
- Future<String> result = testEndpoint.callAsync(new Callable<String>() {
- @Override
- public String call() throws Exception {
+ CompletableFuture<String> result = testEndpoint.callAsync(
+ () -> {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
@@ -100,8 +98,8 @@ public class AsyncCallsTest extends TestLogger {
concurrentAccess.set(true);
}
return "test";
- }
- }, Time.seconds(30L));
+ },
+ Time.seconds(30L));
String str = result.get(30, TimeUnit.SECONDS);
assertEquals("test", str);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index bbde331..07dadae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.ReflectionUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -42,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -74,7 +74,7 @@ public class RpcCompletenessTest extends TestLogger {
private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
- private static final Class<?> futureClass = Future.class;
+ private static final Class<?> futureClass = CompletableFuture.class;
private static final Class<?> timeoutClass = Time.class;
@Test
@@ -195,7 +195,7 @@ public class RpcCompletenessTest extends TestLogger {
/**
* Checks whether the gateway method fulfills the gateway method requirements.
* <ul>
- * <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
+ * <li>It checks whether the return type is void or a {@link CompletableFuture} wrapping the actual result. </li>
* <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
* </ul>
*
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index e05c8d8..4220fff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -33,6 +32,7 @@ import org.junit.Test;
import scala.Option;
import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -57,7 +57,7 @@ public class RpcConnectionTest {
// can only pass if the connection problem is not recognized merely via a timeout
rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
- Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
+ CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
future.get(10000000, TimeUnit.SECONDS);
fail("should never complete normally");
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 03fe84b..ccf0acd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -18,10 +18,7 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -77,8 +74,8 @@ public abstract class TestingGatewayBase implements RpcGateway {
// utilities
// ------------------------------------------------------------------------
- public <T> Future<T> futureWithTimeout(long timeoutMillis) {
- FlinkCompletableFuture<T> future = new FlinkCompletableFuture<>();
+ public <T> CompletableFuture<T> futureWithTimeout(long timeoutMillis) {
+ CompletableFuture<T> future = new CompletableFuture<>();
executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS);
return future;
}