You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/05/11 15:39:46 UTC

[GitHub] flink pull request #3873: [FLINK-6555] [futures] Generalize ConjunctFuture t...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/3873

    [FLINK-6555] [futures] Generalize ConjunctFuture to return results

    The ConjunctFuture now returns the set of values of the individual futures it is composed of once it is completed.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink generalizeConjunctFuture

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3873
    
----
commit a6fc20d9f8cda04a835459f38ed885e87f3d478b
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-05-11T15:36:17Z

    [FLINK-6555] [futures] Generalize ConjunctFuture to return results
    
    The ConjunctFuture now returns the set of future values once it is completed.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3873: [FLINK-6555] [futures] Generalize ConjunctFuture t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3873


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3873: [FLINK-6555] [futures] Generalize ConjunctFuture to retur...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3873
  
    Updated the PR to incorporate the PR review. Thanks for the review @StephanEwen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3873: [FLINK-6555] [futures] Generalize ConjunctFuture t...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3873#discussion_r116062457
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures)
     	 * <p>Implementation notice: The member fields all have package-private access, because they are
     	 * either accessed by an inner subclass or by the enclosing class.
     	 */
    -	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
    +	private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> {
     
     		/** The total number of futures in the conjunction */
     		final int numTotal;
     
     		/** The number of futures in the conjunction that are already complete */
     		final AtomicInteger numCompleted = new AtomicInteger();
     
    +		final ArrayList<T> results;
    +
     		/** The function that is attached to all futures in the conjunction. Once a future
     		 * is complete, this function tracks the completion or fails the conjunct.  
     		 */
    -		final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
    +		final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {
     
     			@Override
    -			public Void apply(Object o, Throwable throwable) {
    +			public Void apply(T o, Throwable throwable) {
     				if (throwable != null) {
     					completeExceptionally(throwable);
    -				}
    -				else if (numTotal == numCompleted.incrementAndGet()) {
    -					complete(null);
    +				} else {
    +					results.add(o);
    --- End diff --
    
    Is this thread safe? My assumption is that many of the completion handlers can be called at the same time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3873: [FLINK-6555] [futures] Generalize ConjunctFuture to retur...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3873
  
    You're right with the thread safety. I will change it. I will introduce a `WaitingFuture` which will simply wait on the completion of all its futures and discard all future values, thus, returning `null` as a result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3873: [FLINK-6555] [futures] Generalize ConjunctFuture t...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3873#discussion_r116169564
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -163,26 +165,31 @@ public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures)
     	 * <p>Implementation notice: The member fields all have package-private access, because they are
     	 * either accessed by an inner subclass or by the enclosing class.
     	 */
    -	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
    +	private static class ConjunctFutureImpl<T> extends FlinkCompletableFuture<Collection<T>> implements ConjunctFuture<T> {
     
     		/** The total number of futures in the conjunction */
     		final int numTotal;
     
     		/** The number of futures in the conjunction that are already complete */
     		final AtomicInteger numCompleted = new AtomicInteger();
     
    +		final ArrayList<T> results;
    +
     		/** The function that is attached to all futures in the conjunction. Once a future
     		 * is complete, this function tracks the completion or fails the conjunct.  
     		 */
    -		final BiFunction<Object, Throwable, Void> completionHandler = new BiFunction<Object, Throwable, Void>() {
    +		final BiFunction<T, Throwable, Void> completionHandler = new BiFunction<T, Throwable, Void>() {
     
     			@Override
    -			public Void apply(Object o, Throwable throwable) {
    +			public Void apply(T o, Throwable throwable) {
     				if (throwable != null) {
     					completeExceptionally(throwable);
    -				}
    -				else if (numTotal == numCompleted.incrementAndGet()) {
    -					complete(null);
    +				} else {
    +					results.add(o);
    --- End diff --
    
    True, I wanted to add an atomic integer to determine the index but forgot about it. Thanks for catching it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---