You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/07/31 16:22:13 UTC

[GitHub] flink pull request #4433: [FLINK-7317] [futures] Replace Flink's futures wit...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4433

    [FLINK-7317] [futures] Replace Flink's futures with Java 8's CompletableFuture in ExecutionGraph

    ## What is the purpose of the change
    
    Replace Flink's `Futures` with Java 8's `CompletableFuture` in `ExecutionGraph`
    
    This PR is based #4429, #4431, #4432, #4430.
    
    ## Brief change log
    
    *(for example:)*
      - Use `CompletableFuture` in `ExecutionGraph`
      - Change `FutureUtils#retry` to work with `CompletableFuture`
      - Let `ConjunctFuture` extend from `CompletableFuture`
    
    ## Verifying this change
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink rfScheduling

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4433.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4433
    
----
commit afe1d171132bb3724e672a1c4ce74a3f7c185908
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-31T13:07:18Z

    [FLINK-7313] [futures] Add Flink future and Scala future to Java 8 CompletableFuture conversion
    
    Add DirectExecutionContext
    
    Add Scala Future to Java 8 CompletableFuture utility to FutureUtils
    
    Add Flink future to Java 8's CompletableFuture conversion utility to FutureUtils
    
    Add base class for Flink's unchecked future exceptions

commit 84dbf47a47e3f73bcf52db112efc36cb47f43180
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-31T15:55:06Z

    [FLINK-7318] [futures] Replace Flink's futures in StackTraceSampleCoordinator with Java 8 CompletableFuture

commit 2fa9a9ebe5c669b6b9b20ec427d3faf10ffe5712
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-31T13:53:48Z

    [FLINK-7314] [futures] Replace Flink's futures with CompletableFuture in TaskManagerLogHandler

commit f0344e4b554f440f966cd481ac245941d8a465ad
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-31T16:06:20Z

    [FLINK-7319] [futures] Replace Flink's Futures with Java 8 CompletableFuture in MesosResourceManager

commit d37fc6bd262d57324ee5f5b6e163b7450de9e286
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-31T14:33:21Z

    [FLINK-7317] [futures] Replace Flink's futures with Java 8's CompletableFuture in ExecutionGraph
    
    Change FutureUtils.retry to work with CompletableFutures
    
    Let ConjunctFutures extends CompletableFuture
    
    Remove Flink's futures from ExecutionGraph

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4433: [FLINK-7317] [futures] Replace Flink's futures with Java ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4433
  
    Travis has passed with an unrelated test failure (end to end Kafka test). Merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4433: [FLINK-7317] [futures] Replace Flink's futures wit...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4433


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4433: [FLINK-7317] [futures] Replace Flink's futures wit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4433#discussion_r130590959
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -50,42 +49,38 @@
     	 * @param <T> type of the result
     	 * @return Future containing either the result of the operation or a {@link RetryException}
     	 */
    -	public static <T> Future<T> retry(
    -		final Callable<Future<T>> operation,
    +	public static <T> java.util.concurrent.CompletableFuture<T> retry(
    +		final Callable<java.util.concurrent.CompletableFuture<T>> operation,
     		final int retries,
     		final Executor executor) {
     
    -		Future<T> operationResultFuture;
    +		java.util.concurrent.CompletableFuture<T> operationResultFuture;
     
     		try {
     			operationResultFuture = operation.call();
     		} catch (Exception e) {
    -			return FlinkCompletableFuture.completedExceptionally(
    -				new RetryException("Could not execute the provided operation.", e));
    +			java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
    +			exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
    +			return exceptionResult;
     		}
     
    -		return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>() {
    -			@Override
    -			public Future<T> apply(T t, Throwable throwable) {
    +		return operationResultFuture.handleAsync(
    +			(t, throwable) -> {
     				if (throwable != null) {
     					if (retries > 0) {
     						return retry(operation, retries - 1, executor);
     					} else {
    -						return FlinkCompletableFuture.completedExceptionally(
    -							new RetryException("Could not complete the operation. Number of retries " +
    -								"has been exhausted.", throwable));
    +						java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
    +						exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
    +							"has been exhausted.", throwable));
    +						return exceptionResult;
     					}
     				} else {
    -					return FlinkCompletableFuture.completed(t);
    +					return java.util.concurrent.CompletableFuture.completedFuture(t);
     				}
    -			}
    -		}, executor)
    -		.thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
    -			@Override
    -			public Future<T> apply(Future<T> value) {
    -				return value;
    -			}
    -		});
    +			},
    +			executor)
    +		.thenCompose(value -> value);
    --- End diff --
    
    To flatten the `CompletableFuture<CompletableFuture<T>>` into `CompletableFuture<T>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4433: [FLINK-7317] [futures] Replace Flink's futures wit...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4433#discussion_r130591322
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -199,25 +194,19 @@ public RetryException(Throwable cause) {
     		/** The function that is attached to all futures in the conjunction. Once a future
    --- End diff --
    
    I think it should still be valid.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4433: [FLINK-7317] [futures] Replace Flink's futures with Java ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4433
  
    Thanks for your review @zentol. I'll rebase the PR and let it run again on Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4433: [FLINK-7317] [futures] Replace Flink's futures wit...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4433#discussion_r130401027
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -199,25 +194,19 @@ public RetryException(Throwable cause) {
     		/** The function that is attached to all futures in the conjunction. Once a future
    --- End diff --
    
    javadoc seems outdated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4433: [FLINK-7317] [futures] Replace Flink's futures wit...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4433#discussion_r130400792
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -50,42 +49,38 @@
     	 * @param <T> type of the result
     	 * @return Future containing either the result of the operation or a {@link RetryException}
     	 */
    -	public static <T> Future<T> retry(
    -		final Callable<Future<T>> operation,
    +	public static <T> java.util.concurrent.CompletableFuture<T> retry(
    +		final Callable<java.util.concurrent.CompletableFuture<T>> operation,
     		final int retries,
     		final Executor executor) {
     
    -		Future<T> operationResultFuture;
    +		java.util.concurrent.CompletableFuture<T> operationResultFuture;
     
     		try {
     			operationResultFuture = operation.call();
     		} catch (Exception e) {
    -			return FlinkCompletableFuture.completedExceptionally(
    -				new RetryException("Could not execute the provided operation.", e));
    +			java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
    +			exceptionResult.completeExceptionally(new RetryException("Could not execute the provided operation.", e));
    +			return exceptionResult;
     		}
     
    -		return operationResultFuture.handleAsync(new BiFunction<T, Throwable, Future<T>>() {
    -			@Override
    -			public Future<T> apply(T t, Throwable throwable) {
    +		return operationResultFuture.handleAsync(
    +			(t, throwable) -> {
     				if (throwable != null) {
     					if (retries > 0) {
     						return retry(operation, retries - 1, executor);
     					} else {
    -						return FlinkCompletableFuture.completedExceptionally(
    -							new RetryException("Could not complete the operation. Number of retries " +
    -								"has been exhausted.", throwable));
    +						java.util.concurrent.CompletableFuture<T> exceptionResult = new java.util.concurrent.CompletableFuture<>();
    +						exceptionResult.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
    +							"has been exhausted.", throwable));
    +						return exceptionResult;
     					}
     				} else {
    -					return FlinkCompletableFuture.completed(t);
    +					return java.util.concurrent.CompletableFuture.completedFuture(t);
     				}
    -			}
    -		}, executor)
    -		.thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
    -			@Override
    -			public Future<T> apply(Future<T> value) {
    -				return value;
    -			}
    -		});
    +			},
    +			executor)
    +		.thenCompose(value -> value);
    --- End diff --
    
    what is this for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---