You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 21:28:21 UTC
flink git commit: [FLINK-7317] [futures] Replace Flink's futures with
Java 8's CompletableFuture in ExecutionGraph
Repository: flink
Updated Branches:
refs/heads/master d2a8e3741 -> 7e4694b81
[FLINK-7317] [futures] Replace Flink's futures with Java 8's CompletableFuture in ExecutionGraph
Change FutureUtils.retry to work with CompletableFutures
Let ConjunctFutures extends CompletableFuture
Remove Flink's futures from ExecutionGraph
This closes #4433.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e4694b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e4694b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e4694b8
Branch: refs/heads/master
Commit: 7e4694b8198664300b13b5304c62271b192d4512
Parents: d2a8e37
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 16:33:21 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 23:27:56 2017 +0200
----------------------------------------------------------------------
.../webmonitor/StackTraceSampleCoordinator.java | 14 +-
.../StackTraceSampleCoordinatorTest.java | 10 +-
.../flink/runtime/concurrent/FutureUtils.java | 107 ++++++-------
.../flink/runtime/executiongraph/Execution.java | 152 +++++++++----------
.../executiongraph/ExecutionAndSlot.java | 7 +-
.../runtime/executiongraph/ExecutionGraph.java | 45 +++---
.../executiongraph/ExecutionGraphUtils.java | 6 +-
.../executiongraph/ExecutionJobVertex.java | 19 +--
.../runtime/executiongraph/ExecutionVertex.java | 4 +-
.../executiongraph/failover/FailoverRegion.java | 14 +-
.../failover/RestartIndividualStrategy.java | 14 +-
.../runtime/blob/BlobServerDeleteTest.java | 20 +--
.../flink/runtime/blob/BlobServerGetTest.java | 20 +--
.../flink/runtime/blob/BlobServerPutTest.java | 18 +--
.../runtime/concurrent/FutureUtilsTest.java | 61 ++++----
.../executiongraph/ExecutionGraphUtilsTest.java | 18 +--
16 files changed, 236 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index 3521f58..df15b48 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -159,13 +158,12 @@ public class StackTraceSampleCoordinator {
// Trigger all samples
for (Execution execution: executions) {
- final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = FutureUtils.toJava(
- execution.requestStackTraceSample(
- sampleId,
- numSamples,
- delayBetweenSamples,
- maxStackTraceDepth,
- timeout));
+ final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample(
+ sampleId,
+ numSamples,
+ delayBetweenSamples,
+ maxStackTraceDepth,
+ timeout);
stackTraceSampleFuture.handleAsync(
(StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> {
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
index 7d8535a..08c4212 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -394,13 +393,16 @@ public class StackTraceSampleCoordinatorTest extends TestLogger {
boolean sendSuccess) {
Execution exec = mock(Execution.class);
+ CompletableFuture<StackTraceSampleResponse> failedFuture = new CompletableFuture<>();
+ failedFuture.completeExceptionally(new Exception("Send failed."));
+
when(exec.getAttemptId()).thenReturn(executionId);
when(exec.getState()).thenReturn(state);
when(exec.requestStackTraceSample(anyInt(), anyInt(), any(Time.class), anyInt(), any(Time.class)))
.thenReturn(
sendSuccess ?
- FlinkCompletableFuture.completed(mock(StackTraceSampleResponse.class)) :
- FlinkCompletableFuture.completedExceptionally(new Exception("Send failed")));
+ CompletableFuture.completedFuture(mock(StackTraceSampleResponse.class)) :
+ failedFuture);
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
@@ -415,7 +417,7 @@ public class StackTraceSampleCoordinatorTest extends TestLogger {
ScheduledExecutorService scheduledExecutorService,
int timeout) {
- final FlinkCompletableFuture<StackTraceSampleResponse> future = new FlinkCompletableFuture<>();
+ final CompletableFuture<StackTraceSampleResponse> future = new CompletableFuture<>();
Execution exec = mock(Execution.class);
when(exec.getAttemptId()).thenReturn(executionId);
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 8721e52..eb0c07d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -50,42 +50,38 @@ public class FutureUtils {
* @param <T> type of the result
* @return Future containing either the result of the operation or a {@link RetryException}
*/
- public static <T> Future<T> retry(
- final Callable<Future<T>> operation,
+ public static <T> java.util.concurrent.CompletableFuture<T> retry(
+ final Callable<java.util.concurrent.CompletableFuture<T>> operation,
final int retries,
final Executor executor) {
- Future<T> operationResultFuture;
+ java.util.concurrent.CompletableFuture<T> operationResultFuture;
try {
operationResultFuture = operation.call();
} catch (Exception e) {
- return FlinkCompletableFuture.completedExceptionally(
- new RetryException("Could not execute the provided operation.", e));
+ java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
+ exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
+ return exceptionResult;
}
- return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>() {
- @Override
- public Future<T> apply(T t, Throwable throwable) {
+ return operationResultFuture.handleAsync(
+ (t, throwable) -> {
if (throwable != null) {
if (retries > 0) {
return retry(operation, retries - 1, executor);
} else {
- return FlinkCompletableFuture.completedExceptionally(
- new RetryException("Could not complete the operation. Number of retries " +
- "has been exhausted.", throwable));
+ java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
+ exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+ "has been exhausted.", throwable));
+ return exceptionResult;
}
} else {
- return FlinkCompletableFuture.completed(t);
+ return java.util.concurrent.CompletableFuture.completedFuture(t);
}
- }
- }, executor)
- .thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
- @Override
- public Future<T> apply(Future<T> value) {
- return value;
- }
- });
+ },
+ executor)
+ .thenCompose(value -> value);
}
public static class RetryException extends Exception {
@@ -121,17 +117,17 @@ public class FutureUtils {
* @param futures The futures that make up the conjunction. No null entries are allowed.
* @return The ConjunctFuture that completes once all given futures are complete (or one fails).
*/
- public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends Future<? extends T>> futures) {
+ public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends java.util.concurrent.CompletableFuture<? extends T>> futures) {
checkNotNull(futures, "futures");
final ResultConjunctFuture<T> conjunct = new ResultConjunctFuture<>(futures.size());
if (futures.isEmpty()) {
- conjunct.complete(Collections.<T>emptyList());
+ conjunct.complete(Collections.emptyList());
}
else {
- for (Future<? extends T> future : futures) {
- future.handle(conjunct.completionHandler);
+ for (java.util.concurrent.CompletableFuture<? extends T> future : futures) {
+ future.whenComplete(conjunct::handleCompletedFuture);
}
}
@@ -149,7 +145,7 @@ public class FutureUtils {
* @param futures The futures to wait on. No null entries are allowed.
* @return The WaitingFuture that completes once all given futures are complete (or one fails).
*/
- public static ConjunctFuture<Void> waitForAll(Collection<? extends Future<?>> futures) {
+ public static ConjunctFuture<Void> waitForAll(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
checkNotNull(futures, "futures");
return new WaitingConjunctFuture(futures);
@@ -164,25 +160,25 @@ public class FutureUtils {
* {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
* many of the Futures are already complete.
*/
- public interface ConjunctFuture<T> extends CompletableFuture<T> {
+ public abstract static class ConjunctFuture<T> extends java.util.concurrent.CompletableFuture<T> {
/**
* Gets the total number of Futures in the conjunction.
* @return The total number of Futures in the conjunction.
*/
- int getNumFuturesTotal();
+ public abstract int getNumFuturesTotal();
/**
* Gets the number of Futures in the conjunction that are already complete.
* @return The number of Futures in the conjunction that are already complete
*/
- int getNumFuturesCompleted();
+ public abstract int getNumFuturesCompleted();
}
/**
* The implementation of the {@link ConjunctFuture} which returns its Futures' result as a collection.
*/
- private static class ResultConjunctFuture<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<Collection<T>> {
+ private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
/** The total number of futures in the conjunction */
private final int numTotal;
@@ -199,25 +195,19 @@ public class FutureUtils {
/** The function that is attached to all futures in the conjunction. Once a future
* is complete, this function tracks the completion or fails the conjunct.
*/
- final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {
-
- @Override
- public Void apply(T o, Throwable throwable) {
- if (throwable != null) {
- completeExceptionally(throwable);
- } else {
- int index = nextIndex.getAndIncrement();
+ final void handleCompletedFuture(T value, Throwable throwable) {
+ if (throwable != null) {
+ completeExceptionally(throwable);
+ } else {
+ int index = nextIndex.getAndIncrement();
- results[index] = o;
+ results[index] = value;
- if (numCompleted.incrementAndGet() == numTotal) {
- complete(Arrays.asList(results));
- }
+ if (numCompleted.incrementAndGet() == numTotal) {
+ complete(Arrays.asList(results));
}
-
- return null;
}
- };
+ }
@SuppressWarnings("unchecked")
ResultConjunctFuture(int numTotal) {
@@ -240,7 +230,7 @@ public class FutureUtils {
* Implementation of the {@link ConjunctFuture} interface which waits only for the completion
* of its futures and does not return their values.
*/
- private static final class WaitingConjunctFuture extends FlinkCompletableFuture<Void> implements ConjunctFuture<Void> {
+ private static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
/** Number of completed futures */
private final AtomicInteger numCompleted = new AtomicInteger(0);
@@ -248,23 +238,18 @@ public class FutureUtils {
/** Total number of futures to wait on */
private final int numTotal;
- /** Handler which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
- private final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
- @Override
- public Void apply(Object o, Throwable throwable) {
- if (throwable == null) {
- if (numTotal == numCompleted.incrementAndGet()) {
- complete(null);
- }
- } else {
- completeExceptionally(throwable);
+ /** Method which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
+ private void handleCompletedFuture(Object ignored, Throwable throwable) {
+ if (throwable == null) {
+ if (numTotal == numCompleted.incrementAndGet()) {
+ complete(null);
}
-
- return null;
+ } else {
+ completeExceptionally(throwable);
}
- };
+ }
- private WaitingConjunctFuture(Collection<? extends Future<?>> futures) {
+ private WaitingConjunctFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
Preconditions.checkNotNull(futures, "Futures must not be null.");
this.numTotal = futures.size();
@@ -272,8 +257,8 @@ public class FutureUtils {
if (futures.isEmpty()) {
complete(null);
} else {
- for (Future<?> future : futures) {
- future.handle(completionHandler);
+ for (java.util.concurrent.CompletableFuture<?> future : futures) {
+ future.whenComplete(this::handleCompletedFuture);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c0f1f39..66dee0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -25,12 +25,7 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -56,7 +51,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
@@ -129,7 +124,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors;
/** A future that completes once the Execution reaches a terminal ExecutionState */
- private final FlinkCompletableFuture<ExecutionState> terminationFuture;
+ private final CompletableFuture<ExecutionState> terminationFuture;
private volatile ExecutionState state = CREATED;
@@ -189,7 +184,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
markTimestamp(ExecutionState.CREATED, startTimestamp);
this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
- this.terminationFuture = new FlinkCompletableFuture<>();
+ this.terminationFuture = new CompletableFuture<>();
}
// --------------------------------------------------------------------------------------------
@@ -279,7 +274,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
*
* @return A future for the execution's termination
*/
- public Future<ExecutionState> getTerminationFuture() {
+ public CompletableFuture<ExecutionState> getTerminationFuture() {
return terminationFuture;
}
@@ -306,14 +301,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
*/
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
try {
- final Future<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
+ final CompletableFuture<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
// that we directly deploy the tasks if the slot allocation future is completed. This is
// necessary for immediate deployment.
- final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {
- @Override
- public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+ final CompletableFuture<Void> deploymentFuture = slotAllocationFuture.handle(
+ (simpleSlot, throwable) -> {
if (simpleSlot != null) {
try {
deployToSlot(simpleSlot);
@@ -330,7 +324,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
return null;
}
- });
+ );
// if tasks have to scheduled immediately check that the task has been deployed
if (!queued && !deploymentFuture.isDone()) {
@@ -344,7 +338,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
}
- public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued)
+ public CompletableFuture<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued)
throws IllegalExecutionStateException {
checkNotNull(slotProvider);
@@ -365,7 +359,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup, locationConstraint);
- return slotProvider.allocateSlot(toSchedule, queued);
+ return FutureUtils.toJava(slotProvider.allocateSlot(toSchedule, queued));
}
else {
// call race, already deployed, or already done
@@ -424,24 +418,25 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
-
- submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable failure) {
- if (failure instanceof TimeoutException) {
- String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')';
-
- markFailed(new Exception(
- "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
- + ") not responding after a timeout of " + timeout, failure));
- }
- else {
- markFailed(failure);
+ final CompletableFuture<Acknowledge> submitResultFuture = FutureUtils.toJava(
+ taskManagerGateway.submitTask(deployment, timeout));
+
+ submitResultFuture.whenCompleteAsync(
+ (ack, failure) -> {
+ // only respond to the failure case
+ if (failure != null) {
+ if (failure instanceof TimeoutException) {
+ String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
+
+ markFailed(new Exception(
+ "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ + ") not responding after a timeout of " + timeout, failure));
+ } else {
+ markFailed(failure);
+ }
}
- return null;
- }
- }, executor);
+ },
+ executor);
}
catch (Throwable t) {
markFailed(t);
@@ -458,24 +453,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- Future<Acknowledge> stopResultFuture = FutureUtils.retry(
- new Callable<Future<Acknowledge>>() {
-
- @Override
- public Future<Acknowledge> call() throws Exception {
- return taskManagerGateway.stopTask(attemptId, timeout);
- }
- },
+ CompletableFuture<Acknowledge> stopResultFuture = FutureUtils.retry(
+ () -> FutureUtils.toJava(taskManagerGateway.stopTask(attemptId, timeout)),
NUM_STOP_CALL_TRIES,
executor);
- stopResultFuture.exceptionally(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable failure) {
+ stopResultFuture.exceptionally(
+ failure -> {
LOG.info("Stopping task was not successful.", failure);
return null;
- }
- });
+ });
}
}
@@ -575,9 +562,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
// TODO The current approach may send many update messages even though the consuming
// task has already been deployed with all necessary information. We have to check
// whether this is a problem and fix it, if it is.
- FlinkFuture.supplyAsync(new Callable<Void>(){
- @Override
- public Void call() throws Exception {
+ CompletableFuture.supplyAsync(
+ () -> {
try {
consumerVertex.scheduleForExecution(
consumerVertex.getExecutionGraph().getSlotProvider(),
@@ -588,8 +574,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
}
return null;
- }
- }, executor);
+ },
+ executor);
// double check to resolve race conditions
if(consumerVertex.getExecutionState() == RUNNING){
@@ -681,7 +667,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
* @param timeout until the request times out
* @return Future stack trace sample response
*/
- public Future<StackTraceSampleResponse> requestStackTraceSample(
+ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
int sampleId,
int numSamples,
Time delayBetweenSamples,
@@ -693,15 +679,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- return taskManagerGateway.requestStackTraceSample(
- attemptId,
- sampleId,
- numSamples,
- delayBetweenSamples,
- maxStrackTraceDepth,
- timeout);
+ return FutureUtils.toJava(
+ taskManagerGateway.requestStackTraceSample(
+ attemptId,
+ sampleId,
+ numSamples,
+ delayBetweenSamples,
+ maxStrackTraceDepth,
+ timeout));
} else {
- return FlinkCompletableFuture.completedExceptionally(new Exception("The execution has no slot assigned."));
+ CompletableFuture<StackTraceSampleResponse> result = new CompletableFuture<>();
+ result.completeExceptionally(new Exception("The execution has no slot assigned."));
+
+ return result;
}
}
@@ -1023,23 +1013,18 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
- Future<Acknowledge> cancelResultFuture = FutureUtils.retry(
- new Callable<Future<Acknowledge>>() {
- @Override
- public Future<Acknowledge> call() throws Exception {
- return taskManagerGateway.cancelTask(attemptId, timeout);
- }
- },
+ CompletableFuture<Acknowledge> cancelResultFuture = FutureUtils.retry(
+ () -> FutureUtils.toJava(taskManagerGateway.cancelTask(attemptId, timeout)),
NUM_CANCEL_CALL_TRIES,
executor);
- cancelResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable failure) {
- fail(new Exception("Task could not be canceled.", failure));
- return null;
- }
- }, executor);
+ cancelResultFuture.whenCompleteAsync(
+ (ack, failure) -> {
+ if (failure != null) {
+ fail(new Exception("Task could not be canceled.", failure));
+ }
+ },
+ executor);
}
}
@@ -1068,16 +1053,17 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
- Future<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout);
+ CompletableFuture<Acknowledge> updatePartitionsResultFuture = FutureUtils.toJava(
+ taskManagerGateway.updatePartitions(attemptId, partitionInfos, timeout));
- updatePartitionsResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable failure) {
- fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
- " failed due to:", failure));
- return null;
- }
- }, executor);
+ updatePartitionsResultFuture.whenCompleteAsync(
+ (ack, failure) -> {
+ // fail if there was a failure
+ if (failure != null) {
+ fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
+ " failed due to:", failure));
+ }
+ }, executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
index ea6186e..123ff0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.SimpleSlot;
+import java.util.concurrent.CompletableFuture;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -30,9 +31,9 @@ public class ExecutionAndSlot {
public final Execution executionAttempt;
- public final Future<SimpleSlot> slotFuture;
+ public final CompletableFuture<SimpleSlot> slotFuture;
- public ExecutionAndSlot(Execution executionAttempt, Future<SimpleSlot> slotFuture) {
+ public ExecutionAndSlot(Execution executionAttempt, CompletableFuture<SimpleSlot> slotFuture) {
this.executionAttempt = checkNotNull(executionAttempt);
this.slotFuture = checkNotNull(slotFuture);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dded029..ae9b5f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -38,14 +38,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -91,6 +86,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
@@ -793,7 +790,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
newExecJobVertices.add(ejv);
}
- terminationFuture = new FlinkCompletableFuture<>();
+ terminationFuture = new CompletableFuture<>();
failoverStrategy.notifyNewVertices(newExecJobVertices);
}
@@ -852,7 +849,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
try {
// collecting all the slots may resize and fail in that operation without slots getting lost
- final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+ final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
@@ -887,10 +884,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}, timeout.getSize(), timeout.getUnit());
- allAllocationsComplete.handleAsync(new BiFunction<Void, Throwable, Void>() {
-
- @Override
- public Void apply(Void slots, Throwable throwable) {
+ allAllocationsComplete.handleAsync(
+ (Void slots, Throwable throwable) -> {
try {
// we do not need the cancellation timeout any more
timeoutCancelHandle.cancel(false);
@@ -907,9 +902,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
slot = execAndSlot.slotFuture.getNow(null);
checkNotNull(slot);
}
- catch (ExecutionException | NullPointerException e) {
+ catch (CompletionException | NullPointerException e) {
throw new IllegalStateException("SlotFuture is incomplete " +
- "or erroneous even though all futures completed");
+ "or erroneous even though all futures completed", e);
}
// actual deployment
@@ -938,8 +933,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// Wouldn't it be nice if we could return an actual Void object?
// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
return null;
- }
- }, futureExecutor);
+ },
+ futureExecutor);
// from now on, slots will be rescued by the the futures and their completion, or by the timeout
successful = true;
@@ -964,7 +959,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// make sure no concurrent local actions interfere with the cancellation
final long globalVersionForRestart = incrementGlobalModVersion();
- final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+ final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
// cancel all tasks (that still need cancelling)
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
@@ -973,14 +968,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// we build a future that is complete once all vertices have reached a terminal state
final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
- allTerminal.thenAccept(new AcceptFunction<Void>() {
- @Override
- public void accept(Void value) {
+ allTerminal.thenAccept(
+ (Void value) -> {
// cancellations may currently be overridden by failures which trigger
// restarts, so we need to pass a proper restart global version here
allVerticesInTerminalState(globalVersionForRestart);
}
- });
+ );
return;
}
@@ -1126,7 +1120,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
final long globalVersionForRestart = incrementGlobalModVersion();
// we build a future that is complete once all vertices have reached a terminal state
- final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
+ final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
// cancel all tasks (that still need cancelling)
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
@@ -1134,12 +1128,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
final ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
- allTerminal.thenAccept(new AcceptFunction<Void>() {
- @Override
- public void accept(Void value) {
- allVerticesInTerminalState(globalVersionForRestart);
- }
- });
+ allTerminal.thenAccept((Void value) -> allVerticesInTerminalState(globalVersionForRestart));
return;
}
@@ -1250,7 +1239,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
@VisibleForTesting
- public Future<JobStatus> getTerminationFuture() {
+ public CompletableFuture<JobStatus> getTerminationFuture() {
return terminationFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
index cd6d6aa..8558533 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.util.ExceptionUtils;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
/**
* Utilities for dealing with the execution graphs and scheduling.
@@ -40,8 +40,8 @@ public class ExecutionGraphUtils {
*
* @param slotFuture The future for the slot to release.
*/
- public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) {
- slotFuture.handle(ReleaseSlotFunction.INSTANCE);
+ public static void releaseSlotFuture(CompletableFuture<SimpleSlot> slotFuture) {
+ slotFuture.handle(ReleaseSlotFunction.INSTANCE::apply);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index f5a592a..5ee7a9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -31,8 +31,6 @@ import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -53,10 +51,12 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
* An {@code ExecutionJobVertex} is part of the {@link ExecutionGraph}, and the peer
@@ -475,7 +475,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
try {
// allocate the next slot (future)
final Execution exec = vertices[i].getCurrentExecutionAttempt();
- final Future<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
+ final CompletableFuture<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
slots[i] = new ExecutionAndSlot(exec, future);
successful = true;
}
@@ -507,17 +507,14 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
*
* @return A future that is complete once all tasks have canceled.
*/
- public Future<Void> cancelWithFuture() {
+ public CompletableFuture<Void> cancelWithFuture() {
// we collect all futures from the task cancellations
- ArrayList<Future<ExecutionState>> futures = new ArrayList<>(parallelism);
-
- // cancel each vertex
- for (ExecutionVertex ev : getTaskVertices()) {
- futures.add(ev.cancel());
- }
+ CompletableFuture<ExecutionState>[] futures = Arrays.stream(getTaskVertices())
+ .map(ExecutionVertex::cancel)
+ .<CompletableFuture<ExecutionState>>toArray(CompletableFuture[]::new);
// return a conjunct future, which is complete once all individual tasks are canceled
- return FutureUtils.waitForAll(futures);
+ return CompletableFuture.allOf(futures);
}
public void fail(Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e8c1984..0ff71e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.Archiveable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -61,6 +60,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
@@ -604,7 +604,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
*
* @return A future that completes once the execution has reached its final state.
*/
- public Future<ExecutionState> cancel() {
+ public CompletableFuture<ExecutionState> cancel() {
// to avoid any case of mixup in the presence of concurrent calls,
// we copy a reference to the stack to make sure both calls go to the same Execution
final Execution exec = this.currentExecution;
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 6066c77..1919c61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.executiongraph.failover;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -37,6 +35,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -143,7 +142,7 @@ public class FailoverRegion {
if (transitionState(curStatus, JobStatus.CANCELLING)) {
// we build a future that is complete once all vertices have reached a terminal state
- final ArrayList<Future<?>> futures = new ArrayList<>(connectedExecutionVertexes.size());
+ final ArrayList<CompletableFuture<?>> futures = new ArrayList<>(connectedExecutionVertexes.size());
// cancel all tasks (that still need cancelling)
for (ExecutionVertex vertex : connectedExecutionVertexes) {
@@ -151,12 +150,9 @@ public class FailoverRegion {
}
final FutureUtils.ConjunctFuture<Void> allTerminal = FutureUtils.waitForAll(futures);
- allTerminal.thenAcceptAsync(new AcceptFunction<Void>() {
- @Override
- public void accept(Void value) {
- allVerticesInTerminalState(globalModVersionOfFailover);
- }
- }, executor);
+ allTerminal.thenAcceptAsync(
+ (Void value) -> allVerticesInTerminalState(globalModVersionOfFailover),
+ executor);
break;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
index 0e7bca5..80f1d2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.executiongraph.failover;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -36,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -107,14 +106,13 @@ public class RestartIndividualStrategy extends FailoverStrategy {
// Note: currently all tasks passed here are already in their terminal state,
// so we could actually avoid the future. We use it anyways because it is cheap and
// it helps to support better testing
- final Future<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
+ final CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
final ExecutionVertex vertexToRecover = taskExecution.getVertex();
final long globalModVersion = taskExecution.getGlobalModVersion();
- terminationFuture.thenAcceptAsync(new AcceptFunction<ExecutionState>() {
- @Override
- public void accept(ExecutionState value) {
+ terminationFuture.thenAcceptAsync(
+ (ExecutionState value) -> {
try {
long createTimestamp = System.currentTimeMillis();
Execution newExecution = vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion);
@@ -127,8 +125,8 @@ public class RestartIndividualStrategy extends FailoverStrategy {
executionGraph.failGlobal(
new Exception("Error during fine grained recovery - triggering full recovery", e));
}
- }
- }, callbackExecutor);
+ },
+ callbackExecutor);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 7100e79..22271af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -19,9 +19,8 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -32,7 +31,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -229,7 +228,7 @@ public class BlobServerDeleteTest extends TestLogger {
final int concurrentDeleteOperations = 3;
final ExecutorService executor = Executors.newFixedThreadPool(concurrentDeleteOperations);
- final List<Future<Void>> deleteFutures = new ArrayList<>(concurrentDeleteOperations);
+ final List<CompletableFuture<Void>> deleteFutures = new ArrayList<>(concurrentDeleteOperations);
final byte[] data = {1, 2, 3};
@@ -244,21 +243,22 @@ public class BlobServerDeleteTest extends TestLogger {
assertTrue(blobServer.getStorageLocation(blobKey).exists());
for (int i = 0; i < concurrentDeleteOperations; i++) {
- Future<Void> deleteFuture = FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ CompletableFuture<Void> deleteFuture = CompletableFuture.supplyAsync(
+ () -> {
try (BlobClient blobClient = blobServer.createClient()) {
blobClient.delete(blobKey);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
}
return null;
- }
- }, executor);
+ },
+ executor);
deleteFutures.add(deleteFuture);
}
- Future<Void> waitFuture = FutureUtils.waitForAll(deleteFutures);
+ CompletableFuture<Void> waitFuture = FutureUtils.waitForAll(deleteFutures);
// make sure all delete operation have completed successfully
// in case of no lock, one of the delete operations should eventually fail
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 3209648..73827bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -22,9 +22,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
@@ -44,7 +43,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -173,7 +172,7 @@ public class BlobServerGetTest extends TestLogger {
final BlobStore blobStore = mock(BlobStore.class);
final int numberConcurrentGetOperations = 3;
- final List<Future<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
+ final List<CompletableFuture<InputStream>> getOperations = new ArrayList<>(numberConcurrentGetOperations);
final byte[] data = {1, 2, 3, 4, 99, 42};
final ByteArrayInputStream bais = new ByteArrayInputStream(data);
@@ -200,9 +199,8 @@ public class BlobServerGetTest extends TestLogger {
try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
for (int i = 0; i < numberConcurrentGetOperations; i++) {
- Future<InputStream> getOperation = FlinkCompletableFuture.supplyAsync(new Callable<InputStream>() {
- @Override
- public InputStream call() throws Exception {
+ CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync(
+ () -> {
try (BlobClient blobClient = blobServer.createClient();
InputStream inputStream = blobClient.get(blobKey)) {
byte[] buffer = new byte[data.length];
@@ -210,14 +208,16 @@ public class BlobServerGetTest extends TestLogger {
IOUtils.readFully(inputStream, buffer);
return new ByteArrayInputStream(buffer);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not read blob for key " + blobKey + '.', e);
}
- }
- }, executor);
+ },
+ executor);
getOperations.add(getOperation);
}
- Future<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
+ CompletableFuture<Collection<InputStream>> inputStreamsFuture = FutureUtils.combineAll(getOperations);
Collection<InputStream> inputStreams = inputStreamsFuture.get();
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 8b8ddf9..80c6822 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -37,7 +36,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -314,7 +313,7 @@ public class BlobServerPutTest extends TestLogger {
final CountDownLatch countDownLatch = new CountDownLatch(concurrentPutOperations);
final byte[] data = new byte[dataSize];
- ArrayList<Future<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
+ ArrayList<CompletableFuture<BlobKey>> allFutures = new ArrayList(concurrentPutOperations);
ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations);
@@ -322,14 +321,15 @@ public class BlobServerPutTest extends TestLogger {
final BlobServer blobServer = new BlobServer(configuration, blobStore)) {
for (int i = 0; i < concurrentPutOperations; i++) {
- Future<BlobKey> putFuture = FlinkCompletableFuture.supplyAsync(new Callable<BlobKey>() {
- @Override
- public BlobKey call() throws Exception {
+ CompletableFuture<BlobKey> putFuture = CompletableFuture.supplyAsync(
+ () -> {
try (BlobClient blobClient = blobServer.createClient()) {
return blobClient.put(new BlockingInputStream(countDownLatch, data));
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not upload blob.", e);
}
- }
- }, executor);
+ },
+ executor);
allFutures.add(putFuture);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index e262459..cc95e7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.concurrent;
import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.TestLogger;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
@@ -31,6 +30,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.*;
@@ -58,9 +58,9 @@ public class FutureUtilsTest extends TestLogger{
try {
futureFactory.createFuture(Arrays.asList(
- new FlinkCompletableFuture<Object>(),
+ new CompletableFuture<>(),
null,
- new FlinkCompletableFuture<Object>()));
+ new CompletableFuture<>()));
fail();
} catch (NullPointerException ignored) {}
}
@@ -68,10 +68,10 @@ public class FutureUtilsTest extends TestLogger{
@Test
public void testConjunctFutureCompletion() throws Exception {
// some futures that we combine
- CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
// some future is initially completed
future2.complete(new Object());
@@ -79,10 +79,7 @@ public class FutureUtilsTest extends TestLogger{
// build the conjunct future
ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
- Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
- @Override
- public void accept(Object value) {}
- });
+ CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
assertEquals(4, result.getNumFuturesTotal());
assertEquals(1, result.getNumFuturesCompleted());
@@ -116,18 +113,15 @@ public class FutureUtilsTest extends TestLogger{
@Test
public void testConjunctFutureFailureOnFirst() throws Exception {
- CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
// build the conjunct future
ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
- Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
- @Override
- public void accept(Object value) {}
- });
+ CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
assertEquals(4, result.getNumFuturesTotal());
assertEquals(0, result.getNumFuturesCompleted());
@@ -158,19 +152,16 @@ public class FutureUtilsTest extends TestLogger{
@Test
public void testConjunctFutureFailureOnSuccessive() throws Exception {
- CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
- CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
// build the conjunct future
ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
assertEquals(4, result.getNumFuturesTotal());
- Future<?> resultMapped = result.thenAccept(new AcceptFunction<Object>() {
- @Override
- public void accept(Object value) {}
- });
+ java.util.concurrent.CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
future1.complete(new Object());
future3.complete(new Object());
@@ -202,11 +193,11 @@ public class FutureUtilsTest extends TestLogger{
*/
@Test
public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
- CompletableFuture<Integer> future1 = FlinkCompletableFuture.completed(1);
- CompletableFuture<Long> future2 = FlinkCompletableFuture.completed(2L);
- CompletableFuture<Double> future3 = new FlinkCompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Integer> future1 = java.util.concurrent.CompletableFuture.completedFuture(1);
+ java.util.concurrent.CompletableFuture<Long> future2 = java.util.concurrent.CompletableFuture.completedFuture(2L);
+ java.util.concurrent.CompletableFuture<Double> future3 = new java.util.concurrent.CompletableFuture<>();
- ConjunctFuture<Collection<Number>> result = FutureUtils.<Number>combineAll(Arrays.asList(future1, future2, future3));
+ ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
assertFalse(result.isDone());
@@ -219,7 +210,7 @@ public class FutureUtilsTest extends TestLogger{
@Test
public void testConjunctOfNone() throws Exception {
- final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<Future<Object>>emptyList());
+ final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<java.util.concurrent.CompletableFuture<Object>>emptyList());
assertEquals(0, result.getNumFuturesTotal());
assertEquals(0, result.getNumFuturesCompleted());
@@ -230,13 +221,13 @@ public class FutureUtilsTest extends TestLogger{
* Factory to create {@link ConjunctFuture} for testing.
*/
private interface FutureFactory {
- ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures);
+ ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures);
}
private static class ConjunctFutureFactory implements FutureFactory {
@Override
- public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) {
+ public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
return FutureUtils.combineAll(futures);
}
}
@@ -244,7 +235,7 @@ public class FutureUtilsTest extends TestLogger{
private static class WaitingFutureFactory implements FutureFactory {
@Override
- public ConjunctFuture<?> createFuture(Collection<? extends Future<?>> futures) {
+ public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
return FutureUtils.waitForAll(futures);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7e4694b8/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
index 2e6da98..c616501 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
@@ -34,6 +33,7 @@ import org.junit.Test;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import static org.mockito.Mockito.*;
@@ -51,12 +51,12 @@ public class ExecutionGraphUtilsTest {
final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
- final FlinkCompletableFuture<SimpleSlot> incompleteFuture = new FlinkCompletableFuture<>();
+ final CompletableFuture<SimpleSlot> incompleteFuture = new CompletableFuture<>();
- final FlinkCompletableFuture<SimpleSlot> completeFuture = new FlinkCompletableFuture<>();
+ final CompletableFuture<SimpleSlot> completeFuture = new CompletableFuture<>();
completeFuture.complete(slot2);
- final FlinkCompletableFuture<SimpleSlot> disposedSlotFuture = new FlinkCompletableFuture<>();
+ final CompletableFuture<SimpleSlot> disposedSlotFuture = new CompletableFuture<>();
slot3.releaseSlot();
disposedSlotFuture.complete(slot3);
@@ -89,16 +89,16 @@ public class ExecutionGraphUtilsTest {
ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
null,
- new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot1)),
+ new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot1)),
null,
- new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot2)),
+ new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot2)),
null
};
ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
- new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot3)),
- new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot4)),
- new ExecutionAndSlot(mockExecution, FlinkCompletableFuture.completed(slot5))
+ new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot3)),
+ new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot4)),
+ new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot5))
};
List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2);