You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/30 23:09:55 UTC

[1/2] flink git commit: [hotfix] Remove redundant FutureUtils#getFailedFuture

Repository: flink
Updated Branches:
  refs/heads/master 152f6c9af -> c568aed1e


[hotfix] Remove redundant FutureUtils#getFailedFuture

FutureUtils#completedExceptionally does exactly the same.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/747cf821
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/747cf821
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/747cf821

Branch: refs/heads/master
Commit: 747cf821f96eb559c28af39e2150a134224214b1
Parents: 152f6c9
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Oct 29 16:01:18 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 31 00:08:52 2017 +0100

----------------------------------------------------------------------
 .../client/proxy/KvStateClientProxyHandler.java     |  2 +-
 .../client/proxy/KvStateClientProxyImpl.java        |  2 +-
 .../flink/runtime/concurrent/FutureUtils.java       | 16 ----------------
 3 files changed, 2 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/747cf821/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index d434336..73ef7f3 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -169,7 +169,7 @@ public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequ
 
 					final InetSocketAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex);
 					if (serverAddress == null) {
-						return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName()));
+						return FutureUtils.completedExceptionally(new UnknownKvStateKeyGroupLocationException(getServerName()));
 					} else {
 						// Query server
 						final KvStateID kvStateId = location.getKvStateID(keyGroupIndex);

http://git-wip-us.apache.org/repos/asf/flink/blob/747cf821/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
index f473443..6fcaf40 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -43,7 +43,7 @@ import java.util.concurrent.CompletableFuture;
 public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {
 
 	private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
-			FutureUtils.getFailedFuture(new UnknownJobManagerException());
+			FutureUtils.completedExceptionally(new UnknownJobManagerException());
 
 	/** Number of threads used to process incoming requests. */
 	private final int queryExecutorThreads;

http://git-wip-us.apache.org/repos/asf/flink/blob/747cf821/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 50ef543..b982c8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -445,20 +445,4 @@ public class FutureUtils {
 
 		return result;
 	}
-
-	// ------------------------------------------------------------------------
-	//  Future Completed with an exception.
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Returns a {@link CompletableFuture} that has failed with the exception
-	 * provided as argument.
-	 * @param throwable the exception to fail the future with.
-	 * @return The failed future.
-	 */
-	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
-		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
-		failedAttempt.completeExceptionally(throwable);
-		return failedAttempt;
-	}
 }


[2/2] flink git commit: [FLINK-7940] Add FutureUtils.orTimeout

Posted by tr...@apache.org.
[FLINK-7940] Add FutureUtils.orTimeout

This commit adds a convenience function which allows to easily add a timeout to
a CompletableFuture.

This closes #4918.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c568aed1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c568aed1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c568aed1

Branch: refs/heads/master
Commit: c568aed1ecb7675a2776e15f120e45ad576eeb37
Parents: 747cf82
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Oct 29 16:38:53 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 31 00:08:53 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 67 ++++++++++++++++++++
 .../runtime/concurrent/FutureUtilsTest.java     | 18 ++++++
 2 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index b982c8e..c18068b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.concurrent;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.Preconditions;
 
 import akka.dispatch.OnComplete;
@@ -31,7 +32,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Supplier;
@@ -445,4 +448,68 @@ public class FutureUtils {
 
 		return result;
 	}
+
+	/**
+	 * Times the given future out after the timeout.
+	 *
+	 * @param future to time out
+	 * @param timeout after which the given future is timed out
+	 * @param timeUnit time unit of the timeout
+	 * @param <T> type of the given future
+	 * @return The timeout enriched future
+	 */
+	public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
+		final ScheduledFuture<?> timeoutFuture = Delayer.delay(new Timeout(future), timeout, timeUnit);
+
+		future.whenComplete((T value, Throwable throwable) -> {
+			if (!timeoutFuture.isDone()) {
+				timeoutFuture.cancel(false);
+			}
+		});
+
+		return future;
+	}
+
+	/**
+	 * Runnable to complete the given future with a {@link TimeoutException}.
+	 */
+	private static final class Timeout implements Runnable {
+
+		private final CompletableFuture<?> future;
+
+		private Timeout(CompletableFuture<?> future) {
+			this.future = Preconditions.checkNotNull(future);
+		}
+
+		@Override
+		public void run() {
+			future.completeExceptionally(new TimeoutException());
+		}
+	}
+
+	/**
+	 * Delay scheduler used to timeout futures.
+	 *
+	 * <p>This class creates a singleton scheduler used to run the provided actions.
+	 */
+	private static final class Delayer {
+		static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(
+			1,
+			new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));
+
+		/**
+		 * Delay the given action by the given delay.
+		 *
+		 * @param runnable to execute after the given delay
+		 * @param delay after which to execute the runnable
+		 * @param timeUnit time unit of the delay
+		 * @return Future of the scheduled action
+		 */
+		private static ScheduledFuture<?> delay(Runnable runnable, long delay, TimeUnit timeUnit) {
+			Preconditions.checkNotNull(runnable);
+			Preconditions.checkNotNull(timeUnit);
+
+			return delayer.schedule(runnable, delay, timeUnit);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c568aed1/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index b779bc9..eb0ce2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -223,4 +224,21 @@ public class FutureUtilsTest extends TestLogger {
 		assertTrue(retryFuture.isCancelled());
 		verify(scheduledFutureMock).cancel(anyBoolean());
 	}
+
+	/**
+	 * Tests that a future is timed out after the specified timeout.
+	 */
+	@Test
+	public void testOrTimeout() throws Exception {
+		final CompletableFuture<String> future = new CompletableFuture<>();
+		final long timeout = 10L;
+
+		FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS);
+
+		try {
+			future.get();
+		} catch (ExecutionException e) {
+			assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
+		}
+	}
 }