You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/08/01 12:16:02 UTC
[jira] [Commented] (FLINK-7317) Remove Flink's futures from
ExecutionGraph
[ https://issues.apache.org/jira/browse/FLINK-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108797#comment-16108797 ]
ASF GitHub Bot commented on FLINK-7317:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4433#discussion_r130590959
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -50,42 +49,38 @@
* @param <T> type of the result
* @return Future containing either the result of the operation or a {@link RetryException}
*/
- public static <T> Future<T> retry(
- final Callable<Future<T>> operation,
+ public static <T> java.util.concurrent.CompletableFuture<T> retry(
+ final Callable<java.util.concurrent.CompletableFuture<T>> operation,
final int retries,
final Executor executor) {
- Future<T> operationResultFuture;
+ java.util.concurrent.CompletableFuture<T> operationResultFuture;
try {
operationResultFuture = operation.call();
} catch (Exception e) {
- return FlinkCompletableFuture.completedExceptionally(
- new RetryException("Could not execute the provided operation.", e));
+ java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
+ exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
+ return exceptionResult;
}
- return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>() {
- @Override
- public Future<T> apply(T t, Throwable throwable) {
+ return operationResultFuture.handleAsync(
+ (t, throwable) -> {
if (throwable != null) {
if (retries > 0) {
return retry(operation, retries - 1, executor);
} else {
- return FlinkCompletableFuture.completedExceptionally(
- new RetryException("Could not complete the operation. Number of retries " +
- "has been exhausted.", throwable));
+ java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
+ exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+ "has been exhausted.", throwable));
+ return exceptionResult;
}
} else {
- return FlinkCompletableFuture.completed(t);
+ return java.util.concurrent.CompletableFuture.completedFuture(t);
}
- }
- }, executor)
- .thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
- @Override
- public Future<T> apply(Future<T> value) {
- return value;
- }
- });
+ },
+ executor)
+ .thenCompose(value -> value);
--- End diff --
To flatten the `CompletableFuture<CompletableFuture<T>>` into `CompletableFuture<T>`
> Remove Flink's futures from ExecutionGraph
> ------------------------------------------
>
> Key: FLINK-7317
> URL: https://issues.apache.org/jira/browse/FLINK-7317
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)