You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/03 11:49:04 UTC
[2/3] flink git commit: [FLINK-7334] [futures] Replace Flink's
futures with Java 8's CompletableFuture in RpcEndpoint,
RpcGateways and RpcService
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index d4afdbd..8084154 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskmanager.Task;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* {@link TaskExecutor} RPC gateway interface
@@ -48,7 +48,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param resourceManagerLeaderId current leader id of the ResourceManager
* @return answer to the slot request
*/
- Future<Acknowledge> requestSlot(
+ CompletableFuture<Acknowledge> requestSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
@@ -64,7 +64,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout of the submit operation
* @return Future acknowledge of the successful operation
*/
- Future<Acknowledge> submitTask(
+ CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd,
UUID leaderId,
@RpcTimeout Time timeout);
@@ -77,7 +77,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout for the update partitions operation
* @return Future acknowledge if the partitions have been successfully updated
*/
- Future<Acknowledge> updatePartitions(
+ CompletableFuture<Acknowledge> updatePartitions(
ExecutionAttemptID executionAttemptID,
Iterable<PartitionInfo> partitionInfos,
@RpcTimeout Time timeout);
@@ -99,7 +99,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param checkpointOptions for performing the checkpoint
* @return Future acknowledge if the checkpoint has been successfully triggered
*/
- Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
+ CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
/**
* Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
@@ -110,7 +110,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
* @return Future acknowledge if the checkpoint has been successfully confirmed
*/
- Future<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
+ CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
/**
* Stop the given task.
@@ -119,7 +119,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout for the stop operation
* @return Future acknowledge if the task is successfully stopped
*/
- Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+ CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
/**
* Cancel the given task.
@@ -128,7 +128,7 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param timeout for the cancel operation
* @return Future acknowledge if the task is successfully canceled
*/
- Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
+ CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
/**
* Heartbeat request from the job manager
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 4f91166..4084d67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -156,7 +155,7 @@ public class TaskExecutorToResourceManagerConnection
ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
- return FutureUtils.toJava(resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout));
+ return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 78b49ef..b077b76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -52,6 +51,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -162,7 +162,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
}
// export the termination future for caller to know it is terminated
- public Future<Void> getTerminationFuture() {
+ public CompletableFuture<Void> getTerminationFuture() {
return taskManager.getTerminationFuture();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 3b9da48..a919c78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -32,6 +31,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
public class RpcInputSplitProvider implements InputSplitProvider {
private final UUID jobMasterLeaderId;
@@ -61,7 +61,7 @@ public class RpcInputSplitProvider implements InputSplitProvider {
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
Preconditions.checkNotNull(userCodeClassLoader);
- Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
+ CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
jobMasterLeaderId, jobVertexID, executionAttemptID);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
index 07d04e6..26e1b0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.taskexecutor.rpc;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -46,6 +45,6 @@ public class RpcPartitionStateChecker implements PartitionProducerStateChecker {
IntermediateDataSetID resultId,
ResultPartitionID partitionId) {
- return FutureUtils.toJava(jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId));
+ return jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
index cf01d5a..d898562 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.taskexecutor.rpc;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -32,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class RpcResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
@@ -55,18 +54,17 @@ public class RpcResultPartitionConsumableNotifier implements ResultPartitionCons
}
@Override
public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) {
- Future<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
+ CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(
jobMasterLeaderId, partitionId, timeout);
- acknowledgeFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable value) {
- LOG.error("Could not schedule or update consumers at the JobManager.", value);
+ acknowledgeFuture.whenCompleteAsync(
+ (Acknowledge ack, Throwable throwable) -> {
+ if (throwable != null) {
+ LOG.error("Could not schedule or update consumers at the JobManager.", throwable);
- taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", value));
-
- return null;
- }
- }, executor);
+ taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", throwable));
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a6712ad..ef4fa86 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,7 +22,7 @@ import java.io.IOException
import java.net._
import java.util.UUID
import java.util.concurrent.{Future => JavaFuture, _}
-import java.util.function.BiFunction
+import java.util.function.{BiFunction, Consumer}
import akka.actor.Status.{Failure, Success}
import akka.actor._
@@ -1105,17 +1105,18 @@ class JobManager(
val originalSender = new AkkaActorGateway(sender(), leaderSessionID.orNull)
- val sendingFuture = stackTraceFuture.thenAccept(new AcceptFunction[StackTrace] {
- override def accept(value: StackTrace): Unit = {
- originalSender.tell(value)
- }
- })
+ val sendingFuture = stackTraceFuture.thenAccept(
+ new Consumer[StackTrace]() {
+ override def accept(value: StackTrace): Unit = {
+ originalSender.tell(value)
+ }
+ })
- sendingFuture.exceptionally(new ApplyFunction[Throwable, Void] {
+ sendingFuture.exceptionally(new java.util.function.Function[Throwable, Void] {
override def apply(value: Throwable): Void = {
log.info("Could not send requested stack trace.", value)
- return null
+ null
}
})
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 09e829e..37a4547 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.akka;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -43,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -125,7 +123,7 @@ public class QuarantineMonitorTest extends TestLogger {
// start watching the watchee
watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
- Future<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
+ CompletableFuture<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
} finally {
@@ -166,7 +164,7 @@ public class QuarantineMonitorTest extends TestLogger {
// start watching the watchee
watcher.tell(new Watch(watcheeAddress), ActorRef.noSender());
- Future<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
+ CompletableFuture<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
Assert.assertEquals(actorSystem1Address.toString(), quarantineFuture.get());
} finally {
@@ -182,8 +180,8 @@ public class QuarantineMonitorTest extends TestLogger {
private final CompletableFuture<String> hasQuarantinedFuture;
public TestingQuarantineHandler() {
- this.wasQuarantinedByFuture = new FlinkCompletableFuture<>();
- this.hasQuarantinedFuture = new FlinkCompletableFuture<>();
+ this.wasQuarantinedByFuture = new CompletableFuture<>();
+ this.hasQuarantinedFuture = new CompletableFuture<>();
}
@Override
@@ -196,11 +194,11 @@ public class QuarantineMonitorTest extends TestLogger {
hasQuarantinedFuture.complete(remoteSystem);
}
- public Future<String> getWasQuarantinedByFuture() {
+ public CompletableFuture<String> getWasQuarantinedByFuture() {
return wasQuarantinedByFuture;
}
- public Future<String> getHasQuarantinedFuture() {
+ public CompletableFuture<String> getHasQuarantinedFuture() {
return hasQuarantinedFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index 5df5c58..e23f6a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -339,10 +338,10 @@ public class CheckpointCoordinatorMasterHooksTest {
final MasterTriggerRestoreHook<Void> hook = mockGeneric(MasterTriggerRestoreHook.class);
when(hook.getIdentifier()).thenReturn(id);
when(hook.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class))).thenAnswer(
- new Answer<Future<Void>>() {
+ new Answer<CompletableFuture<Void>>() {
@Override
- public Future<Void> answer(InvocationOnMock invocation) throws Throwable {
+ public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
assertEquals(1, cc.getNumberOfPendingCheckpoints());
long checkpointId = (Long) invocation.getArguments()[0];
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index f1bc43b..e1144c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -71,6 +70,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -524,7 +524,7 @@ public class ResourceManagerTest extends TestLogger {
final SlotReport slotReport = new SlotReport();
// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
- Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
+ CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(
rmLeaderSessionId,
taskManagerAddress,
taskManagerResourceID,
@@ -622,7 +622,7 @@ public class ResourceManagerTest extends TestLogger {
rmLeaderElectionService.isLeader(rmLeaderId);
// test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time
- Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
rmLeaderId,
jmLeaderId,
jmResourceId,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index a7a86c3..267f10b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
@@ -42,6 +41,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.concurrent.CompletableFuture;
+
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -86,7 +87,7 @@ public class DispatcherTest extends TestLogger {
DispatcherGateway dispatcherGateway = dispatcher.getSelf();
- Future<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
+ CompletableFuture<Acknowledge> acknowledgeFuture = dispatcherGateway.submitJob(jobGraph, timeout);
acknowledgeFuture.get();
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index bfcab87..f4e8b30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -120,7 +119,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
when(rootSlot.getSlotNumber()).thenReturn(0);
- when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ when(taskManagerGateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index d3086a8..b88a928 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -576,7 +575,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
private static TaskManagerGateway createTaskManager() {
TaskManagerGateway tm = mock(TaskManagerGateway.class);
when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ .thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
return tm;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index effe417..de9081b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.StoppingException;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -37,6 +36,8 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import java.util.concurrent.CompletableFuture;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -155,9 +156,9 @@ public class ExecutionGraphStopTest extends TestLogger {
final TaskManagerGateway gateway = mock(TaskManagerGateway.class);
when(gateway.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ .thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
when(gateway.stopTask(any(ExecutionAttemptID.class), any(Time.class)))
- .thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ .thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
index f453d20..be68532 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/NotCancelAckingTaskGateway.java
@@ -19,14 +19,15 @@
package org.apache.flink.runtime.executiongraph.utils;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
+import java.util.concurrent.CompletableFuture;
+
public class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
@Override
- public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return new FlinkCompletableFuture<>();
+ public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return new CompletableFuture<>();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 6e67c1a..b968d39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -23,8 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -35,6 +34,7 @@ import org.apache.flink.runtime.messages.StackTrace;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
/**
* A TaskManagerGateway that simply acks the basic operations (deploy, cancel, update) and does not
@@ -56,39 +56,39 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
public void stopCluster(ApplicationStatus applicationStatus, String message) {}
@Override
- public Future<StackTrace> requestStackTrace(Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ public CompletableFuture<StackTrace> requestStackTrace(Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
@Override
- public Future<StackTraceSampleResponse> requestStackTraceSample(
+ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
ExecutionAttemptID executionAttemptID,
int sampleId,
int numSamples,
Time delayBetweenSamples,
int maxStackTraceDepth,
Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
@Override
- public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
- public Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
- public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
- public Future<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
- return FlinkCompletableFuture.completed(Acknowledge.get());
+ public CompletableFuture<Acknowledge> updatePartitions(ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
}
@Override
@@ -110,12 +110,12 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
CheckpointOptions checkpointOptions) {}
@Override
- public Future<BlobKey> requestTaskManagerLog(Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ public CompletableFuture<BlobKey> requestTaskManagerLog(Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
@Override
- public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
- return FlinkCompletableFuture.completedExceptionally(new UnsupportedOperationException());
+ public CompletableFuture<BlobKey> requestTaskManagerStdout(Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index b444640..4cc4f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.rpc.RpcService;
@@ -35,6 +34,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -82,7 +82,7 @@ public class SlotPoolRpcTest {
);
pool.start(UUID.randomUUID(), "foobar");
- Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
try {
future.get(4, TimeUnit.SECONDS);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index cf95461..3e2293b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -41,6 +40,7 @@ import org.mockito.ArgumentCaptor;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
@@ -85,7 +85,7 @@ public class SlotPoolTest extends TestLogger {
this.resourceManagerGateway = mock(ResourceManagerGateway.class);
when(resourceManagerGateway
.requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)))
- .thenReturn(mock(Future.class, RETURNS_MOCKS));
+ .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway);
}
@@ -101,7 +101,7 @@ public class SlotPoolTest extends TestLogger {
slotPool.registerTaskManager(resourceID);
ScheduledUnit task = mock(ScheduledUnit.class);
- Future<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(task, DEFAULT_TESTING_PROFILE, null);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -126,8 +126,8 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
- Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
assertFalse(future1.isDone());
assertFalse(future2.isDone());
@@ -165,7 +165,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
assertFalse(future1.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -182,7 +182,7 @@ public class SlotPoolTest extends TestLogger {
// return this slot to pool
slot1.releaseSlot();
- Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
// second allocation fulfilled by previous slot returning
SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
@@ -200,7 +200,7 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
assertFalse(future.isDone());
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
@@ -241,14 +241,14 @@ public class SlotPoolTest extends TestLogger {
ResourceID resourceID = new ResourceID("resource");
slotPool.registerTaskManager(resourceID);
- Future<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future1 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
- Future<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+ CompletableFuture<SimpleSlot> future2 = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
assertTrue(slotPool.offerSlot(allocatedSlot));
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 0b25e6c..48a1d45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -34,7 +33,6 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -50,6 +48,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.net.InetAddress;
import java.net.URL;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -177,7 +176,7 @@ public class JobMasterTest extends TestLogger {
anyString(),
any(JobID.class),
any(Time.class)
- )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JobMasterRegistrationSuccess(
+ )).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess(
heartbeatInterval, rmLeaderId, rmResourceId)));
final TestingSerialRpcService rpc = new TestingSerialRpcService();
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 9a4917a..da992bb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.registration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.util.TestLogger;
@@ -123,8 +122,8 @@ public class RetryingRegistrationTest extends TestLogger {
// RPC service that fails upon the first connection, but succeeds on the second
RpcService rpc = mock(RpcService.class);
when(rpc.connect(anyString(), any(Class.class))).thenReturn(
- FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")), // first connection attempt fails
- FlinkCompletableFuture.completed(testGateway) // second connection attempt succeeds
+ FutureUtils.completedExceptionally(new Exception("test connect failure")), // first connection attempt fails
+ CompletableFuture.completedFuture(testGateway) // second connection attempt succeeds
);
when(rpc.getExecutor()).thenReturn(executor);
@@ -245,8 +244,8 @@ public class RetryingRegistrationTest extends TestLogger {
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(
- FlinkCompletableFuture.<RegistrationResponse>completedExceptionally(new Exception("test exception")),
- FlinkCompletableFuture.<RegistrationResponse>completed(new TestRegistrationSuccess(testId)));
+ FutureUtils.completedExceptionally(new Exception("test exception")),
+ CompletableFuture.completedFuture(new TestRegistrationSuccess(testId)));
rpc.registerGateway(testEndpointAddress, testGateway);
@@ -281,7 +280,7 @@ public class RetryingRegistrationTest extends TestLogger {
TestingRpcService rpc = new TestingRpcService();
try {
- FlinkCompletableFuture<RegistrationResponse> result = new FlinkCompletableFuture<>();
+ CompletableFuture<RegistrationResponse> result = new CompletableFuture<>();
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result);
@@ -340,7 +339,7 @@ public class RetryingRegistrationTest extends TestLogger {
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
- return FutureUtils.toJava(gateway.registrationCall(leaderId, timeoutMillis));
+ return gateway.registrationCall(leaderId, timeoutMillis);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 1b23fa3..4cfbc12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -18,13 +18,12 @@
package org.apache.flink.runtime.registration;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.TestingGatewayBase;
import org.apache.flink.util.Preconditions;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -47,7 +46,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
// ------------------------------------------------------------------------
- public Future<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
+ public CompletableFuture<RegistrationResponse> registrationCall(UUID leaderId, long timeout) {
invocations.add(new RegistrationCall(leaderId, timeout));
RegistrationResponse response = responses[pos];
@@ -56,7 +55,7 @@ public class TestRegistrationGateway extends TestingGatewayBase {
}
// return a completed future (for a proper value), or one that never completes and will time out (for null)
- return response != null ? FlinkCompletableFuture.completed(response) : this.<RegistrationResponse>futureWithTimeout(timeout);
+ return response != null ? CompletableFuture.completedFuture(response) : futureWithTimeout(timeout);
}
public BlockingQueue<RegistrationCall> getInvocations() {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index 4d5964e..7b8703e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -34,6 +33,7 @@ import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -83,7 +83,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
jobLeaderIdService.addJob(jobId);
- Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+ CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
// notify the leader id service about the new leader
leaderRetrievalService.notifyListener(address, leaderId);
@@ -117,7 +117,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
jobLeaderIdService.addJob(jobId);
- Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+ CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
// remove the job before we could find a leader
jobLeaderIdService.removeJob(jobId);
@@ -228,7 +228,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
jobLeaderIdService.addJob(jobId);
- Future<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+ CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
// notify the leader id service about the new leader
leaderRetrievalService.notifyListener(address, leaderId);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 4836f74..6480d75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -43,6 +42,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
@@ -74,11 +74,11 @@ public class ResourceManagerJobMasterTest extends TestLogger {
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
// test response successful
- Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
jmLeaderID,
jmResourceId,
@@ -104,12 +104,12 @@ public class ResourceManagerJobMasterTest extends TestLogger {
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
UUID differentLeaderSessionID = UUID.randomUUID();
- Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
differentLeaderSessionID,
jmLeaderID,
jmResourceId,
@@ -134,14 +134,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
UUID differentLeaderSessionID = UUID.randomUUID();
- Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
differentLeaderSessionID,
jmResourceId,
@@ -166,14 +166,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
// test throw exception when receive a registration from job master which takes invalid address
String invalidAddress = "/jobMasterAddress2";
- Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
jmLeaderSessionId,
jmResourceId,
@@ -198,14 +198,14 @@ public class ResourceManagerJobMasterTest extends TestLogger {
"localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID);
TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
- final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+ final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
JobID unknownJobIDToHAServices = new JobID();
// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
- Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
+ CompletableFuture<RegistrationResponse> declineFuture = resourceManager.registerJobManager(
rmLeaderSessionId,
jmLeaderSessionId,
jmResourceId,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 85b7eb4..4127cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -41,6 +40,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNotEquals;
@@ -89,13 +89,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
public void testRegisterTaskExecutor() throws Exception {
try {
// test response successful
- Future<RegistrationResponse> successfulFuture =
+ CompletableFuture<RegistrationResponse> successfulFuture =
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
- Future<RegistrationResponse> duplicateFuture =
+ CompletableFuture<RegistrationResponse> duplicateFuture =
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport);
RegistrationResponse duplicateResponse = duplicateFuture.get();
assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
@@ -115,7 +115,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
try {
// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
UUID differentLeaderSessionID = UUID.randomUUID();
- Future<RegistrationResponse> unMatchedLeaderFuture =
+ CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport);
assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
} finally {
@@ -133,7 +133,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
try {
// test throw exception when receive a registration from taskExecutor which takes invalid address
String invalidAddress = "/taskExecutor2";
- Future<RegistrationResponse> invalidAddressFuture =
+ CompletableFuture<RegistrationResponse> invalidAddressFuture =
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport);
assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
} finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 39c5f25..93e96a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
-import org.apache.flink.runtime.concurrent.*;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -44,7 +44,7 @@ import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -114,7 +114,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+ any(Time.class))).thenReturn(new CompletableFuture<>());
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -241,7 +241,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -280,7 +280,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+ any(Time.class))).thenReturn(new CompletableFuture<>());
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
@@ -338,7 +338,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -482,7 +482,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -527,7 +527,7 @@ public class SlotManagerTest extends TestLogger {
any(AllocationID.class),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -747,8 +747,8 @@ public class SlotManagerTest extends TestLogger {
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
- final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
- final FlinkCompletableFuture<Acknowledge> slotRequestFuture2 = new FlinkCompletableFuture<>();
+ final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Acknowledge> slotRequestFuture2 = new CompletableFuture<>();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
when(taskExecutorGateway.requestSlot(
@@ -826,7 +826,7 @@ public class SlotManagerTest extends TestLogger {
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
- final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
+ final CompletableFuture<Acknowledge> slotRequestFuture1 = new CompletableFuture<>();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
when(taskExecutorGateway.requestSlot(
@@ -835,7 +835,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
any(UUID.class),
- any(Time.class))).thenReturn(slotRequestFuture1, FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -856,24 +856,21 @@ public class SlotManagerTest extends TestLogger {
slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- Future<Void> registrationFuture = FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
+ () -> {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
return null;
- }
- }, mainThreadExecutor)
- .thenAccept(new AcceptFunction<Void>() {
- @Override
- public void accept(Void value) {
+ },
+ mainThreadExecutor)
+ .thenAccept(
+ (Object value) -> {
try {
slotManager.registerSlotRequest(slotRequest);
} catch (SlotManagerException e) {
throw new RuntimeException("Could not register slots.", e);
}
- }
- });
+ });
// check that no exception has been thrown
registrationFuture.get();
@@ -891,12 +888,9 @@ public class SlotManagerTest extends TestLogger {
final SlotID requestedSlotId = slotIdCaptor.getValue();
final SlotID freeSlotId = requestedSlotId.equals(slotId1) ? slotId2 : slotId1;
- Future<Boolean> freeSlotFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return slotManager.getSlot(freeSlotId).isFree();
- }
- }, mainThreadExecutor);
+ CompletableFuture<Boolean> freeSlotFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.getSlot(freeSlotId).isFree(),
+ mainThreadExecutor);
assertTrue(freeSlotFuture.get());
@@ -904,15 +898,10 @@ public class SlotManagerTest extends TestLogger {
final SlotStatus newSlotStatus2 = new SlotStatus(freeSlotId, resourceProfile);
final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
- FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // this should update the slot with the pending slot request triggering the reassignment of it
- slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
-
- return null;
- }
- }, mainThreadExecutor);
+ CompletableFuture.supplyAsync(
+ // this should update the slot with the pending slot request triggering the reassignment of it
+ () -> slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport),
+ mainThreadExecutor);
verify(taskExecutorGateway, timeout(verifyTimeout).times(2)).requestSlot(
slotIdCaptor.capture(),
@@ -926,12 +915,9 @@ public class SlotManagerTest extends TestLogger {
assertEquals(slotId2, requestedSlotId2);
- Future<TaskManagerSlot> requestedSlotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
- @Override
- public TaskManagerSlot call() throws Exception {
- return slotManager.getSlot(requestedSlotId2);
- }
- }, mainThreadExecutor);
+ CompletableFuture<TaskManagerSlot> requestedSlotFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.getSlot(requestedSlotId2),
+ mainThreadExecutor);
TaskManagerSlot slot = requestedSlotFuture.get();
@@ -967,7 +953,7 @@ public class SlotManagerTest extends TestLogger {
eq(allocationId),
anyString(),
eq(leaderId),
- any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+ any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -987,20 +973,16 @@ public class SlotManagerTest extends TestLogger {
slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
- FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- slotManager.registerSlotRequest(slotRequest);
-
- return null;
- }
- }, mainThreadExecutor)
- .thenAccept(new AcceptFunction<Void>() {
- @Override
- public void accept(Void value) {
- slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
- }
- });
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return slotManager.registerSlotRequest(slotRequest);
+ } catch (SlotManagerException e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ mainThreadExecutor)
+ .thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
@@ -1012,44 +994,28 @@ public class SlotManagerTest extends TestLogger {
eq(leaderId),
any(Time.class));
- Future<Boolean> idleFuture = FlinkFuture.supplyAsync(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
- }
- }, mainThreadExecutor);
+ CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
+ mainThreadExecutor);
// check that the TaskManaer is not idle
assertFalse(idleFuture.get());
final SlotID slotId = slotIdArgumentCaptor.getValue();
- Future<TaskManagerSlot> slotFuture = FlinkFuture.supplyAsync(new Callable<TaskManagerSlot>() {
- @Override
- public TaskManagerSlot call() throws Exception {
- return slotManager.getSlot(slotId);
- }
- }, mainThreadExecutor);
+ CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync(
+ () -> slotManager.getSlot(slotId),
+ mainThreadExecutor);
TaskManagerSlot slot = slotFuture.get();
assertTrue(slot.isAllocated());
assertEquals(allocationId, slot.getAllocationId());
- Future<Boolean> idleFuture2 = FlinkFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- slotManager.freeSlot(slotId, allocationId);
-
- return null;
- }
- }, mainThreadExecutor)
- .thenApply(new ApplyFunction<Void, Boolean>() {
- @Override
- public Boolean apply(Void value) {
- return slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID());
- }
- });
+ CompletableFuture<Boolean> idleFuture2 = CompletableFuture.runAsync(
+ () -> slotManager.freeSlot(slotId, allocationId),
+ mainThreadExecutor)
+ .thenApply((Object value) -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()));
assertTrue(idleFuture2.get());
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a1ab1ab..844e159 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
@@ -41,6 +40,7 @@ import org.mockito.Mockito;
import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -104,7 +104,7 @@ public class SlotProtocolTest extends TestLogger {
Mockito.when(
taskExecutorGateway
.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
- .thenReturn(mock(FlinkFuture.class));
+ .thenReturn(mock(CompletableFuture.class));
final ResourceID resourceID = ResourceID.generate();
final SlotID slotID = new SlotID(resourceID, 0);
@@ -139,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
Mockito.when(
taskExecutorGateway
.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
- .thenReturn(mock(FlinkFuture.class));
+ .thenReturn(mock(CompletableFuture.class));
try (SlotManager slotManager = new SlotManager(
scheduledExecutor,
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e636d6c..4be5257 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -23,14 +23,13 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
@@ -90,9 +89,8 @@ public class AsyncCallsTest extends TestLogger {
});
}
- Future<String> result = testEndpoint.callAsync(new Callable<String>() {
- @Override
- public String call() throws Exception {
+ CompletableFuture<String> result = testEndpoint.callAsync(
+ () -> {
boolean holdsLock = lock.tryLock();
if (holdsLock) {
lock.unlock();
@@ -100,8 +98,8 @@ public class AsyncCallsTest extends TestLogger {
concurrentAccess.set(true);
}
return "test";
- }
- }, Time.seconds(30L));
+ },
+ Time.seconds(30L));
String str = result.get(30, TimeUnit.SECONDS);
assertEquals("test", str);
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index bbde331..07dadae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.ReflectionUtil;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -42,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -74,7 +74,7 @@ public class RpcCompletenessTest extends TestLogger {
private static Logger LOG = LoggerFactory.getLogger(RpcCompletenessTest.class);
- private static final Class<?> futureClass = Future.class;
+ private static final Class<?> futureClass = CompletableFuture.class;
private static final Class<?> timeoutClass = Time.class;
@Test
@@ -195,7 +195,7 @@ public class RpcCompletenessTest extends TestLogger {
/**
* Checks whether the gateway method fulfills the gateway method requirements.
* <ul>
- * <li>It checks whether the return type is void or a {@link Future} wrapping the actual result. </li>
+ * <li>It checks whether the return type is void or a {@link CompletableFuture} wrapping the actual result. </li>
* <li>It checks that the method's parameter list contains at most one parameter annotated with {@link RpcTimeout}.</li>
* </ul>
*
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index e05c8d8..4220fff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -23,7 +23,6 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -33,6 +32,7 @@ import org.junit.Test;
import scala.Option;
import scala.Tuple2;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -57,7 +57,7 @@ public class RpcConnectionTest {
// can only pass if the connection problem is not recognized merely via a timeout
rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
- Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
+ CompletableFuture<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
future.get(10000000, TimeUnit.SECONDS);
fail("should never complete normally");
http://git-wip-us.apache.org/repos/asf/flink/blob/eddafc1a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
index 03fe84b..ccf0acd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java
@@ -18,10 +18,7 @@
package org.apache.flink.runtime.rpc;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -77,8 +74,8 @@ public abstract class TestingGatewayBase implements RpcGateway {
// utilities
// ------------------------------------------------------------------------
- public <T> Future<T> futureWithTimeout(long timeoutMillis) {
- FlinkCompletableFuture<T> future = new FlinkCompletableFuture<>();
+ public <T> CompletableFuture<T> futureWithTimeout(long timeoutMillis) {
+ CompletableFuture<T> future = new CompletableFuture<>();
executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS);
return future;
}