You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/30 23:09:55 UTC
[1/2] flink git commit: [hotfix] Remove redundant
FutureUtils#getFailedFuture
Repository: flink
Updated Branches:
refs/heads/master 152f6c9af -> c568aed1e
[hotfix] Remove redundant FutureUtils#getFailedFuture
FutureUtils#completedExceptionally does exactly the same.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/747cf821
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/747cf821
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/747cf821
Branch: refs/heads/master
Commit: 747cf821f96eb559c28af39e2150a134224214b1
Parents: 152f6c9
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Oct 29 16:01:18 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 31 00:08:52 2017 +0100
----------------------------------------------------------------------
.../client/proxy/KvStateClientProxyHandler.java | 2 +-
.../client/proxy/KvStateClientProxyImpl.java | 2 +-
.../flink/runtime/concurrent/FutureUtils.java | 16 ----------------
3 files changed, 2 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/747cf821/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index d434336..73ef7f3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -169,7 +169,7 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
final InetSocketAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex);
if (serverAddress == null) {
- return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName()));
+ return FutureUtils.completedExceptionally(new UnknownKvStateKeyGroupLocationException(getServerName()));
} else {
// Query server
final KvStateID kvStateId = location.getKvStateID(keyGroupIndex);
http://git-wip-us.apache.org/repos/asf/flink/blob/747cf821/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index f473443..6fcaf40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -43,7 +43,7 @@ import java.util.concurrent.CompletableFuture;
public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {
private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
- FutureUtils.getFailedFuture(new UnknownJobManagerException());
+ FutureUtils.completedExceptionally(new UnknownJobManagerException());
/** Number of threads used to process incoming requests. */
private final int queryExecutorThreads;
http://git-wip-us.apache.org/repos/asf/flink/blob/747cf821/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 50ef543..b982c8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -445,20 +445,4 @@ public class FutureUtils {
return result;
}
-
- // ------------------------------------------------------------------------
- // Future Completed with an exception.
- // ------------------------------------------------------------------------
-
- /**
- * Returns a {@link CompletableFuture} that has failed with the exception
- * provided as argument.
- * @param throwable the exception to fail the future with.
- * @return The failed future.
- */
- public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
- CompletableFuture<T> failedAttempt = new CompletableFuture<>();
- failedAttempt.completeExceptionally(throwable);
- return failedAttempt;
- }
}
[2/2] flink git commit: [FLINK-7940] Add FutureUtils.orTimeout
Posted by tr...@apache.org.
[FLINK-7940] Add FutureUtils.orTimeout
This commit adds a convenience function which allows to easily add a timeout to
a CompletableFuture.
This closes #4918.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c568aed1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c568aed1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c568aed1
Branch: refs/heads/master
Commit: c568aed1ecb7675a2776e15f120e45ad576eeb37
Parents: 747cf82
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Oct 29 16:38:53 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 31 00:08:53 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/concurrent/FutureUtils.java | 67 ++++++++++++++++++++
.../runtime/concurrent/FutureUtilsTest.java | 18 ++++++
2 files changed, 85 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index b982c8e..c18068b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.concurrent;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.Preconditions;
import akka.dispatch.OnComplete;
@@ -31,7 +32,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@@ -445,4 +448,68 @@ public class FutureUtils {
return result;
}
+
+ /**
+ * Times the given future out after the timeout.
+ *
+ * @param future to time out
+ * @param timeout after which the given future is timed out
+ * @param timeUnit time unit of the timeout
+ * @param <T> type of the given future
+ * @return The timeout enriched future
+ */
+ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+ final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+ future.whenComplete((T value, Throwable throwable) -> {
+ if (!timeoutFuture.isDone()) {
+ timeoutFuture.cancel(false);
+ }
+ });
+
+ return future;
+ }
+
+ /**
+ * Runnable to complete the given future with a {@link TimeoutException}.
+ */
+ private static final class Timeout implements Runnable {
+
+ private final CompletableFuture<?> future;
+
+ private Timeout(CompletableFuture<?> future) {
+ this.future = Preconditions.checkNotNull(future);
+ }
+
+ @Override
+ public void run() {
+ future.completeExceptionally(new TimeoutException());
+ }
+ }
+
+ /**
+ * Delay scheduler used to timeout futures.
+ *
+ * <p>This class creates a singleton scheduler used to run the provided actions.
+ */
+ private static final class Delayer {
+ static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
+ 1,
+ new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));
+
+ /**
+ * Delay the given action by the given delay.
+ *
+ * @param runnable to execute after the given delay
+ * @param delay after which to execute the runnable
+ * @param timeUnit time unit of the delay
+ * @return Future of the scheduled action
+ */
+ private static ScheduledFuture<?> delay(Runnable runnable, long delay, TimeUnit timeUnit) {
+ Preconditions.checkNotNull(runnable);
+ Preconditions.checkNotNull(timeUnit);
+
+ return delayer.schedule(runnable, delay, timeUnit);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index b779bc9..eb0ce2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -223,4 +224,21 @@ public class FutureUtilsTest extends TestLogger {
assertTrue(retryFuture.isCancelled());
verify(scheduledFutureMock).cancel(anyBoolean());
}
+
+ /**
+ * Tests that a future is timed out after the specified timeout.
+ */
+ @Test
+ public void testOrTimeout() throws Exception {
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ final long timeout = 10L;
+
+ FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS);
+
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
+ }
+ }
}