You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/29 15:41:30 UTC
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/4918
[FLINK-7940] Add FutureUtils.orTimeout
## What is the purpose of the change
This commit adds a convenience function `FutureUtils#orTimeout` which allows to easily add a timeout to
a CompletableFuture.
## Verifying this change
- Added `FutureUtilsTest#testOrTimeout`
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink orTimeout
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4918.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4918
----
commit 83e52f5458379855c481fe169f6be50a8afee336
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-10-29T15:01:18Z
[hotfix] Remove redundant FutureUtils#getFailedFuture
FutureUtils#completedExceptionally does exactly the same.
commit 94b3d14bc2ba64a4862bd83e670f3dbfccdf96b8
Author: Till Rohrmann <tr...@apache.org>
Date: 2017-10-29T15:38:53Z
[FLINK-7940] Add FutureUtils.orTimeout
This commit adds a convenience function which allows to easily add a timeout to
a CompletableFuture.
----
---
[GitHub] flink issue #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/4918
Thanks for your review @zentol. I addressed your comments and once Travis gives green light, I'll merge it.
---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4918#discussion_r147658453
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
return result;
}
- // ------------------------------------------------------------------------
- // Future Completed with an exception.
- // ------------------------------------------------------------------------
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
/**
- * Returns a {@link CompletableFuture} that has failed with the exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
+ * Runnable to complete the given future with a {@link TimeoutException}.
*/
- public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
+ static final class Timeout implements Runnable {
--- End diff --
Yes will do.
---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4918#discussion_r147658822
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
return result;
}
- // ------------------------------------------------------------------------
- // Future Completed with an exception.
- // ------------------------------------------------------------------------
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
/**
- * Returns a {@link CompletableFuture} that has failed with the exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
+ * Runnable to complete the given future with a {@link TimeoutException}.
*/
- public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
+ static final class Timeout implements Runnable {
+
+ private final CompletableFuture<?> future;
+
+ Timeout(CompletableFuture<?> future) {
+ this.future = Preconditions.checkNotNull(future);
+ }
+
+ @Override
+ public void run() {
+ future.completeExceptionally(new TimeoutException());
+ }
+ }
+
+ /**
+ * Delay scheduler used to timeout futures.
+ *
+ * <p>This class creates a singleton scheduler used to run the provided actions.
+ */
+ private static final class Delayer {
+ static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
+ 1,
+ new ExecutorThreadFactory("CompletableFutureDelayScheduler"));
--- End diff --
Good point. Will add it.
---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4918#discussion_r147658765
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
return result;
}
- // ------------------------------------------------------------------------
- // Future Completed with an exception.
- // ------------------------------------------------------------------------
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
/**
- * Returns a {@link CompletableFuture} that has failed with the exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
+ * Runnable to complete the given future with a {@link TimeoutException}.
*/
- public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
+ static final class Timeout implements Runnable {
+
+ private final CompletableFuture<?> future;
+
+ Timeout(CompletableFuture<?> future) {
+ this.future = Preconditions.checkNotNull(future);
+ }
+
+ @Override
+ public void run() {
+ future.completeExceptionally(new TimeoutException());
--- End diff --
Not entirely sure, since people might use this to disambiguate different timeouts from each other. I rather not offer this possibility.
---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/4918
---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4918#discussion_r147594496
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
return result;
}
- // ------------------------------------------------------------------------
- // Future Completed with an exception.
- // ------------------------------------------------------------------------
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
/**
- * Returns a {@link CompletableFuture} that has failed with the exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
+ * Runnable to complete the given future with a {@link TimeoutException}.
*/
- public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
+ static final class Timeout implements Runnable {
+
+ private final CompletableFuture<?> future;
+
+ Timeout(CompletableFuture<?> future) {
+ this.future = Preconditions.checkNotNull(future);
+ }
+
+ @Override
+ public void run() {
+ future.completeExceptionally(new TimeoutException());
--- End diff --
Could be useful to have to used timeout in the exception message.
---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4918#discussion_r147594412
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
return result;
}
- // ------------------------------------------------------------------------
- // Future Completed with an exception.
- // ------------------------------------------------------------------------
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
/**
- * Returns a {@link CompletableFuture} that has failed with the exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
+ * Runnable to complete the given future with a {@link TimeoutException}.
*/
- public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
+ static final class Timeout implements Runnable {
+
+ private final CompletableFuture<?> future;
+
+ Timeout(CompletableFuture<?> future) {
+ this.future = Preconditions.checkNotNull(future);
+ }
+
+ @Override
+ public void run() {
+ future.completeExceptionally(new TimeoutException());
+ }
+ }
+
+ /**
+ * Delay scheduler used to timeout futures.
+ *
+ * <p>This class creates a singleton scheduler used to run the provided actions.
+ */
+ private static final class Delayer {
+ static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
+ 1,
+ new ExecutorThreadFactory("CompletableFutureDelayScheduler"));
--- End diff --
Let's add a "Flink" prefix to the thread name.
---
[GitHub] flink pull request #4918: [FLINK-7940] Add FutureUtils.orTimeout
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4918#discussion_r147594462
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
@@ -446,19 +449,67 @@ public void onComplete(Throwable failure, T success) throws Throwable {
return result;
}
- // ------------------------------------------------------------------------
- // Future Completed with an exception.
- // ------------------------------------------------------------------------
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
/**
- * Returns a {@link CompletableFuture} that has failed with the exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
+ * Runnable to complete the given future with a {@link TimeoutException}.
*/
- public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
+ static final class Timeout implements Runnable {
--- End diff --
make private?
---