You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/29 15:41:30 UTC

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4918

    [FLINK-7940] Add FutureUtils.orTimeout

    ## What is the purpose of the change
    
    This commit adds a convenience function `FutureUtils#orTimeout` which allows to easily add a timeout to
    a CompletableFuture.
    
    ## Verifying this change
    
    - Added `FutureUtilsTest#testOrTimeout`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink orTimeout

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4918.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4918
    
----
commit 83e52f5458379855c481fe169f6be50a8afee336
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-29T15:01:18Z

    [hotfix] Remove redundant FutureUtils#getFailedFuture
    
    FutureUtils#completedExceptionally does exactly the same.

commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-29T15:38:53Z

    [FLINK-7940] Add FutureUtils.orTimeout
    
    This commit adds a convenience function which allows to easily add a timeout to
    a CompletableFuture.

----


---

[GitHub] flink issue #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4918
  
    Thanks for your review @zentol. I addressed your comments and once Travis gives green light, I'll merge it.


---

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147658453
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
     		return result;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  Future Completed with an exception.
    -	// ------------------------------------------------------------------------
    +	/**
    +	 * Times the given future out after the timeout.
    +	 *
    +	 * @param future to time out
    +	 * @param timeout after which the given future is timed out
    +	 * @param timeUnit time unit of the timeout
    +	 * @param <T> type of the given future
    +	 * @return The timeout enriched future
    +	 */
    +	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
    +		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
    +
    +		future.whenComplete((T value, Throwable throwable) -> {
    +			if (!timeoutFuture.isDone()) {
    +				timeoutFuture.cancel(false);
    +			}
    +		});
    +
    +		return future;
    +	}
     
     	/**
    -	 * Returns a {@link CompletableFuture} that has failed with the exception
    -	 * provided as argument.
    -	 * @param throwable the exception to fail the future with.
    -	 * @return The failed future.
    +	 * Runnable to complete the given future with a {@link TimeoutException}.
     	 */
    -	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
    -		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -		failedAttempt.completeExceptionally(throwable);
    -		return failedAttempt;
    +	static final class Timeout implements Runnable {
    --- End diff --
    
    Yes will do.


---

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147658822
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
     		return result;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  Future Completed with an exception.
    -	// ------------------------------------------------------------------------
    +	/**
    +	 * Times the given future out after the timeout.
    +	 *
    +	 * @param future to time out
    +	 * @param timeout after which the given future is timed out
    +	 * @param timeUnit time unit of the timeout
    +	 * @param <T> type of the given future
    +	 * @return The timeout enriched future
    +	 */
    +	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
    +		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
    +
    +		future.whenComplete((T value, Throwable throwable) -> {
    +			if (!timeoutFuture.isDone()) {
    +				timeoutFuture.cancel(false);
    +			}
    +		});
    +
    +		return future;
    +	}
     
     	/**
    -	 * Returns a {@link CompletableFuture} that has failed with the exception
    -	 * provided as argument.
    -	 * @param throwable the exception to fail the future with.
    -	 * @return The failed future.
    +	 * Runnable to complete the given future with a {@link TimeoutException}.
     	 */
    -	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
    -		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -		failedAttempt.completeExceptionally(throwable);
    -		return failedAttempt;
    +	static final class Timeout implements Runnable {
    +
    +		private final CompletableFuture<?> future;
    +
    +		Timeout(CompletableFuture<?> future) {
    +			this.future = Preconditions.checkNotNull(future);
    +		}
    +
    +		@Override
    +		public void run() {
    +			future.completeExceptionally(new TimeoutException());
    +		}
    +	}
    +
    +	/**
    +	 * Delay scheduler used to timeout futures.
    +	 *
    +	 * <p>This class creates a singleton scheduler used to run the provided actions.
    +	 */
    +	private static final class Delayer {
    +		static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
    +			1,
    +			new ExecutorThreadFactory("CompletableFutureDelayScheduler"));
    --- End diff --
    
    Good point. Will add it.


---

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147658765
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
     		return result;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  Future Completed with an exception.
    -	// ------------------------------------------------------------------------
    +	/**
    +	 * Times the given future out after the timeout.
    +	 *
    +	 * @param future to time out
    +	 * @param timeout after which the given future is timed out
    +	 * @param timeUnit time unit of the timeout
    +	 * @param <T> type of the given future
    +	 * @return The timeout enriched future
    +	 */
    +	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
    +		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
    +
    +		future.whenComplete((T value, Throwable throwable) -> {
    +			if (!timeoutFuture.isDone()) {
    +				timeoutFuture.cancel(false);
    +			}
    +		});
    +
    +		return future;
    +	}
     
     	/**
    -	 * Returns a {@link CompletableFuture} that has failed with the exception
    -	 * provided as argument.
    -	 * @param throwable the exception to fail the future with.
    -	 * @return The failed future.
    +	 * Runnable to complete the given future with a {@link TimeoutException}.
     	 */
    -	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
    -		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -		failedAttempt.completeExceptionally(throwable);
    -		return failedAttempt;
    +	static final class Timeout implements Runnable {
    +
    +		private final CompletableFuture<?> future;
    +
    +		Timeout(CompletableFuture<?> future) {
    +			this.future = Preconditions.checkNotNull(future);
    +		}
    +
    +		@Override
    +		public void run() {
    +			future.completeExceptionally(new TimeoutException());
    --- End diff --
    
    Not entirely sure, since people might use this to disambiguate different timeouts from each other. I rather not offer this possibility.


---

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4918


---

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147594496
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
     		return result;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  Future Completed with an exception.
    -	// ------------------------------------------------------------------------
    +	/**
    +	 * Times the given future out after the timeout.
    +	 *
    +	 * @param future to time out
    +	 * @param timeout after which the given future is timed out
    +	 * @param timeUnit time unit of the timeout
    +	 * @param <T> type of the given future
    +	 * @return The timeout enriched future
    +	 */
    +	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
    +		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
    +
    +		future.whenComplete((T value, Throwable throwable) -> {
    +			if (!timeoutFuture.isDone()) {
    +				timeoutFuture.cancel(false);
    +			}
    +		});
    +
    +		return future;
    +	}
     
     	/**
    -	 * Returns a {@link CompletableFuture} that has failed with the exception
    -	 * provided as argument.
    -	 * @param throwable the exception to fail the future with.
    -	 * @return The failed future.
    +	 * Runnable to complete the given future with a {@link TimeoutException}.
     	 */
    -	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
    -		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -		failedAttempt.completeExceptionally(throwable);
    -		return failedAttempt;
    +	static final class Timeout implements Runnable {
    +
    +		private final CompletableFuture<?> future;
    +
    +		Timeout(CompletableFuture<?> future) {
    +			this.future = Preconditions.checkNotNull(future);
    +		}
    +
    +		@Override
    +		public void run() {
    +			future.completeExceptionally(new TimeoutException());
    --- End diff --
    
    Could be useful to have to used timeout in the exception message.


---

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147594412
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
     		return result;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  Future Completed with an exception.
    -	// ------------------------------------------------------------------------
    +	/**
    +	 * Times the given future out after the timeout.
    +	 *
    +	 * @param future to time out
    +	 * @param timeout after which the given future is timed out
    +	 * @param timeUnit time unit of the timeout
    +	 * @param <T> type of the given future
    +	 * @return The timeout enriched future
    +	 */
    +	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
    +		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
    +
    +		future.whenComplete((T value, Throwable throwable) -> {
    +			if (!timeoutFuture.isDone()) {
    +				timeoutFuture.cancel(false);
    +			}
    +		});
    +
    +		return future;
    +	}
     
     	/**
    -	 * Returns a {@link CompletableFuture} that has failed with the exception
    -	 * provided as argument.
    -	 * @param throwable the exception to fail the future with.
    -	 * @return The failed future.
    +	 * Runnable to complete the given future with a {@link TimeoutException}.
     	 */
    -	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
    -		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -		failedAttempt.completeExceptionally(throwable);
    -		return failedAttempt;
    +	static final class Timeout implements Runnable {
    +
    +		private final CompletableFuture<?> future;
    +
    +		Timeout(CompletableFuture<?> future) {
    +			this.future = Preconditions.checkNotNull(future);
    +		}
    +
    +		@Override
    +		public void run() {
    +			future.completeExceptionally(new TimeoutException());
    +		}
    +	}
    +
    +	/**
    +	 * Delay scheduler used to timeout futures.
    +	 *
    +	 * <p>This class creates a singleton scheduler used to run the provided actions.
    +	 */
    +	private static final class Delayer {
    +		static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
    +			1,
    +			new ExecutorThreadFactory("CompletableFutureDelayScheduler"));
    --- End diff --
    
    Let's add a "Flink" prefix to the thread name.


---

[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4918#discussion_r147594462
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
     		return result;
     	}
     
    -	// ------------------------------------------------------------------------
    -	//  Future Completed with an exception.
    -	// ------------------------------------------------------------------------
    +	/**
    +	 * Times the given future out after the timeout.
    +	 *
    +	 * @param future to time out
    +	 * @param timeout after which the given future is timed out
    +	 * @param timeUnit time unit of the timeout
    +	 * @param <T> type of the given future
    +	 * @return The timeout enriched future
    +	 */
    +	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
    +		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
    +
    +		future.whenComplete((T value, Throwable throwable) -> {
    +			if (!timeoutFuture.isDone()) {
    +				timeoutFuture.cancel(false);
    +			}
    +		});
    +
    +		return future;
    +	}
     
     	/**
    -	 * Returns a {@link CompletableFuture} that has failed with the exception
    -	 * provided as argument.
    -	 * @param throwable the exception to fail the future with.
    -	 * @return The failed future.
    +	 * Runnable to complete the given future with a {@link TimeoutException}.
     	 */
    -	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
    -		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
    -		failedAttempt.completeExceptionally(throwable);
    -		return failedAttempt;
    +	static final class Timeout implements Runnable {
    --- End diff --
    
    make private?


---