You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/08/01 12:16:02 UTC

[jira] [Commented] (FLINK-7317) Remove Flink's futures from ExecutionGraph

    [ https://issues.apache.org/jira/browse/FLINK-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16108797#comment-16108797 ] 

ASF GitHub Bot commented on FLINK-7317:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4433#discussion_r130590959
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -50,42 +49,38 @@
     	 * @param <T> type of the result
     	 * @return Future containing either the result of the operation or a {@link RetryException}
     	 */
    -	public static <T> Future<T> retry(
    -		final Callable<Future<T>> operation,
    +	public static <T> java.util.concurrent.CompletableFuture<T> retry(
    +		final Callable<java.util.concurrent.CompletableFuture<T>> operation,
     		final int retries,
     		final Executor executor) {
     
    -		Future<T> operationResultFuture;
    +		java.util.concurrent.CompletableFuture<T> operationResultFuture;
     
     		try {
     			operationResultFuture = operation.call();
     		} catch (Exception e) {
    -			return FlinkCompletableFuture.completedExceptionally(
    -				new RetryException("Could not execute the provided operation.", e));
    +			java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
    +			exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
    +			return exceptionResult;
     		}
     
    -		return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>() {
    -			@Override
    -			public Future<T> apply(T t, Throwable throwable) {
    +		return operationResultFuture.handleAsync(
    +			(t, throwable) -> {
     				if (throwable != null) {
     					if (retries > 0) {
     						return retry(operation, retries - 1, executor);
     					} else {
    -						return FlinkCompletableFuture.completedExceptionally(
    -							new RetryException("Could not complete the operation. Number of retries " +
    -								"has been exhausted.", throwable));
    +						java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
    +						exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
    +							"has been exhausted.", throwable));
    +						return exceptionResult;
     					}
     				} else {
    -					return FlinkCompletableFuture.completed(t);
    +					return java.util.concurrent.CompletableFuture.completedFuture(t);
     				}
    -			}
    -		}, executor)
    -		.thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
    -			@Override
    -			public Future<T> apply(Future<T> value) {
    -				return value;
    -			}
    -		});
    +			},
    +			executor)
    +		.thenCompose(value -> value);
    --- End diff --
    
    To flatten the `CompletableFuture<CompletableFuture<T>>` into `CompletableFuture<T>`


> Remove Flink's futures from ExecutionGraph
> ------------------------------------------
>
>                 Key: FLINK-7317
>                 URL: https://issues.apache.org/jira/browse/FLINK-7317
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)