You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/27 13:42:09 UTC

flink git commit: [FLINK-4903] [futures] Introduce synchronous future operations

Repository: flink
Updated Branches:
  refs/heads/master b410c393c -> 1e1e4dc70


[FLINK-4903] [futures] Introduce synchronous future operations

The synchronous future operations are executed by the thread which adds the operation
if the future is already completed or by the thread executing the last operation.

Increase FlinkFutureTest timeouts to 10s

This closes #2689.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e1e4dc7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e1e4dc7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e1e4dc7

Branch: refs/heads/master
Commit: 1e1e4dc701d3606bdcaa291d2d85460fdfbb0dc7
Parents: b410c39
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Oct 25 10:31:53 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Oct 27 15:40:54 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/concurrent/Future.java |  67 ++++++++
 .../runtime/concurrent/impl/FlinkFuture.java    |  31 ++++
 .../runtime/concurrent/FlinkFutureTest.java     | 156 ++++++++++++++++---
 3 files changed, 235 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e1e4dc7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
index 409c978..a6d5a48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java
@@ -99,6 +99,16 @@ public interface Future<T> {
 	<R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> applyFunction, Executor executor);
 
 	/**
+	 * Applies the given function to the value of the future. The result of the apply function is
+	 * the value of the newly returned future.
+	 *
+	 * @param applyFunction function to apply to the future's value
+	 * @param <R> type of the apply function's return value
+	 * @return future representing the return value of the given apply function
+	 */
+	<R> Future<R> thenApply(ApplyFunction<? super T, ? extends R> applyFunction);
+
+	/**
 	 * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the
 	 * {@link AcceptFunction} does not return a value. The returned future, thus, represents only
 	 * the completion of the accept callback.
@@ -112,6 +122,16 @@ public interface Future<T> {
 	Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, Executor executor);
 
 	/**
+	 * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the
+	 * {@link AcceptFunction} does not return a value. The returned future, thus, represents only
+	 * the completion of the accept callback.
+	 *
+	 * @param acceptFunction function to apply to the future's value
+	 * @return future representing the completion of the accept callback
+	 */
+	Future<Void> thenAccept(AcceptFunction<? super T> acceptFunction);
+
+	/**
 	 * Applies the given function to the value of the future if the future has been completed
 	 * exceptionally. The completing exception is given to the apply function which can return a new
 	 * value which is the value of the returned future.
@@ -126,6 +146,17 @@ public interface Future<T> {
 	<R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor);
 
 	/**
+	 * Applies the given function to the value of the future if the future has been completed
+	 * exceptionally. The completing exception is given to the apply function which can return a new
+	 * value which is the value of the returned future.
+	 *
+	 * @param exceptionallyFunction to apply to the future's value if it is an exception
+	 * @param <R> type of the apply function's return value
+	 * @return future representing the return value of the given apply function
+	 */
+	<R> Future<R> exceptionally(ApplyFunction<Throwable, ? extends R> exceptionallyFunction);
+
+	/**
 	 * Applies the given function to the value of the future. The apply function returns a future
 	 * result, which is flattened. This means that the resulting future of this method represents
 	 * the future's value of the apply function.
@@ -141,6 +172,18 @@ public interface Future<T> {
 	<R> Future<R> thenComposeAsync(ApplyFunction<? super T, ? extends Future<R>> composeFunction, Executor executor);
 
 	/**
+	 * Applies the given function to the value of the future. The apply function returns a future
+	 * result, which is flattened. This means that the resulting future of this method represents
+	 * the future's value of the apply function.
+	 *
+	 * @param composeFunction to apply to the future's value. The function returns a future which is
+	 *                        flattened
+	 * @param <R> type of the returned future's value
+	 * @return future representing the flattened return value of the apply function
+	 */
+	<R> Future<R> thenCompose(ApplyFunction<? super T, ? extends Future<R>> composeFunction);
+
+	/**
 	 * Applies the given handle function to the result of the future. The result can either be the
 	 * future's value or the exception with which the future has been completed. The two cases are
 	 * mutually exclusive. This means that either the left or right argument of the handle function
@@ -156,6 +199,18 @@ public interface Future<T> {
 	<R> Future<R> handleAsync(BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor);
 
 	/**
+	 * Applies the given handle function to the result of the future. The result can either be the
+	 * future's value or the exception with which the future has been completed. The two cases are
+	 * mutually exclusive. This means that either the left or right argument of the handle function
+	 * are non null. The result of the handle function is the returned future's value.
+	 *
+	 * @param biFunction applied to the result (normal and exceptional) of the future
+	 * @param <R> type of the handle function's return value
+	 * @return future representing the handle function's return value
+	 */
+	<R> Future<R> handle(BiFunction<? super T, Throwable, ? extends R> biFunction);
+
+	/**
 	 * Applies the given function to the result of this and the other future after both futures
 	 * have completed. The result of the bi-function is the result of the returned future.
 	 * <p>
@@ -169,4 +224,16 @@ public interface Future<T> {
 	 * @return future representing the bi-function's return value
 	 */
 	<U, R> Future<R> thenCombineAsync(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction, Executor executor);
+
+	/**
+	 * Applies the given function to the result of this and the other future after both futures
+	 * have completed. The result of the bi-function is the result of the returned future.
+	 *
+	 * @param other future whose result is the right input to the bi-function
+	 * @param biFunction applied to the result of this and that future
+	 * @param <U> type of that future's return value
+	 * @param <R> type of the bi-function's return value
+	 * @return future representing the bi-function's return value
+	 */
+	<U, R> Future<R> thenCombine(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e1e4dc7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 277f4fa..e881399 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -24,6 +24,7 @@ import akka.dispatch.Mapper;
 import akka.dispatch.Recover;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.util.Preconditions;
@@ -140,6 +141,11 @@ public class FlinkFuture<T> implements Future<T> {
 	}
 
 	@Override
+	public <R> Future<R> thenApply(final ApplyFunction<? super T, ? extends R> applyFunction) {
+		return thenApplyAsync(applyFunction, Executors.directExecutor());
+	}
+
+	@Override
 	public Future<Void> thenAcceptAsync(final AcceptFunction<? super T> acceptFunction, Executor executor) {
 		Preconditions.checkNotNull(scalaFuture);
 		Preconditions.checkNotNull(acceptFunction);
@@ -158,6 +164,11 @@ public class FlinkFuture<T> implements Future<T> {
 	}
 
 	@Override
+	public Future<Void> thenAccept(AcceptFunction<? super T> acceptFunction) {
+		return thenAcceptAsync(acceptFunction, Executors.directExecutor());
+	}
+
+	@Override
 	public <R> Future<R> exceptionallyAsync(final ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor) {
 		Preconditions.checkNotNull(scalaFuture);
 		Preconditions.checkNotNull(exceptionallyFunction);
@@ -174,6 +185,11 @@ public class FlinkFuture<T> implements Future<T> {
 	}
 
 	@Override
+	public <R> Future<R> exceptionally(ApplyFunction<Throwable, ? extends R> exceptionallyFunction) {
+		return exceptionallyAsync(exceptionallyFunction, Executors.directExecutor());
+	}
+
+	@Override
 	public <R> Future<R> thenComposeAsync(final ApplyFunction<? super T, ? extends Future<R>> applyFunction, Executor executor) {
 		Preconditions.checkNotNull(scalaFuture);
 		Preconditions.checkNotNull(applyFunction);
@@ -215,6 +231,11 @@ public class FlinkFuture<T> implements Future<T> {
 	}
 
 	@Override
+	public <R> Future<R> thenCompose(ApplyFunction<? super T, ? extends Future<R>> composeFunction) {
+		return thenComposeAsync(composeFunction, Executors.directExecutor());
+	}
+
+	@Override
 	public <R> Future<R> handleAsync(final BiFunction<? super T, Throwable, ? extends R> biFunction, Executor executor) {
 		Preconditions.checkNotNull(scalaFuture);
 		Preconditions.checkNotNull(biFunction);
@@ -248,6 +269,11 @@ public class FlinkFuture<T> implements Future<T> {
 	}
 
 	@Override
+	public <R> Future<R> handle(BiFunction<? super T, Throwable, ? extends R> biFunction) {
+		return handleAsync(biFunction, Executors.directExecutor());
+	}
+
+	@Override
 	public <U, R> Future<R> thenCombineAsync(final Future<U> other, final BiFunction<? super T, ? super U, ? extends R> biFunction, final Executor executor) {
 		Preconditions.checkNotNull(other);
 		Preconditions.checkNotNull(biFunction);
@@ -288,6 +314,11 @@ public class FlinkFuture<T> implements Future<T> {
 		return new FlinkFuture<>(result);
 	}
 
+	@Override
+	public <U, R> Future<R> thenCombine(Future<U> other, BiFunction<? super T, ? super U, ? extends R> biFunction) {
+		return thenCombineAsync(other, biFunction, Executors.directExecutor());
+	}
+
 	//-----------------------------------------------------------------------------------
 	// Static factory methods
 	//-----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1e1e4dc7/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index 905f5b5..25d010b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -55,8 +55,8 @@ public class FlinkFutureTest extends TestLogger {
 		executor.shutdown();
 	}
 
-	@Test
-	public void testFutureApply() throws Exception {
+	@Test(timeout = 10000L)
+	public void testFutureApplyAsync() throws Exception {
 		int expectedValue = 42;
 
 		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
@@ -129,8 +129,8 @@ public class FlinkFutureTest extends TestLogger {
 		}
 	}
 
-	@Test
-	public void testExceptionally() throws ExecutionException, InterruptedException {
+	@Test(timeout = 10000L)
+	public void testExceptionallyAsync() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
 		String exceptionMessage = "Foobar";
 
@@ -148,8 +148,8 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(exceptionMessage, actualMessage);
 	}
 
-	@Test
-	public void testCompose() throws ExecutionException, InterruptedException {
+	@Test(timeout = 10000L)
+	public void testComposeAsync() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
 
 		final int expectedValue = 42;
@@ -173,8 +173,8 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(expectedValue, actualValue);
 	}
 
-	@Test
-	public void testCombine() throws ExecutionException, InterruptedException {
+	@Test(timeout = 10000L)
+	public void testCombineAsync() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>();
 		CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>();
 
@@ -197,8 +197,8 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(expectedRightValue + expectedLeftValue, result);
 	}
 
-	@Test
-	public void testCombineLeftFailure() throws InterruptedException {
+	@Test(timeout = 10000L)
+	public void testCombineAsyncLeftFailure() throws InterruptedException {
 		CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>();
 		CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>();
 
@@ -224,8 +224,8 @@ public class FlinkFutureTest extends TestLogger {
 		}
 	}
 
-	@Test
-	public void testCombineRightFailure() throws ExecutionException, InterruptedException {
+	@Test(timeout = 10000L)
+	public void testCombineAsyncRightFailure() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> leftFuture = new FlinkCompletableFuture<>();
 		CompletableFuture<String> rightFuture = new FlinkCompletableFuture<>();
 
@@ -260,8 +260,8 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(new Integer(absentValue), initialFuture.getNow(absentValue));
 	}
 
-	@Test
-	public void testAccept() throws ExecutionException, InterruptedException {
+	@Test(timeout = 10000L)
+	public void testAcceptAsync() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
 		final AtomicInteger atomicInteger = new AtomicInteger(0);
 		int expectedValue = 42;
@@ -280,8 +280,8 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(expectedValue, atomicInteger.get());
 	}
 
-	@Test
-	public void testHandle() throws ExecutionException, InterruptedException {
+	@Test(timeout = 10000L)
+	public void testHandleAsync() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
 		int expectedValue = 43;
 
@@ -301,8 +301,8 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(String.valueOf(expectedValue), result.get());
 	}
 
-	@Test
-	public void testHandleException() throws ExecutionException, InterruptedException {
+	@Test(timeout = 10000L)
+	public void testHandleAsyncException() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
 		String exceptionMessage = "foobar";
 
@@ -322,7 +322,7 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(exceptionMessage, result.get());
 	}
 
-	@Test
+	@Test(timeout = 10000L)
 	public void testMultipleCompleteOperations() throws ExecutionException, InterruptedException {
 		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
 		int expectedValue = 42;
@@ -336,6 +336,124 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(new Integer(expectedValue), initialFuture.get());
 	}
 
+	@Test
+	public void testApply() throws ExecutionException, InterruptedException {
+		int expectedValue = 42;
+
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+
+		Future<String> appliedFuture = initialFuture.thenApply(new ApplyFunction<Integer, String>() {
+			@Override
+			public String apply(Integer value) {
+				return String.valueOf(value);
+			}
+		});
+
+		initialFuture.complete(expectedValue);
+
+		assertEquals(String.valueOf(expectedValue), appliedFuture.get());
+	}
+
+	@Test
+	public void testAccept() throws ExecutionException, InterruptedException {
+		int expectedValue = 42;
+		Future<Integer> initialFuture = FlinkCompletableFuture.completed(expectedValue);
+		final AtomicInteger atomicInteger = new AtomicInteger(0);
+
+		Future<Void> result = initialFuture.thenAccept(new AcceptFunction<Integer>() {
+			@Override
+			public void accept(Integer value) {
+				atomicInteger.set(value);
+			}
+		});
+
+		result.get();
+
+		assertEquals(expectedValue, atomicInteger.get());
+	}
+
+	@Test
+	public void testExceptionally() throws ExecutionException, InterruptedException {
+		String exceptionMessage = "Foobar";
+		Future<Integer> initialFuture = FlinkCompletableFuture
+			.completedExceptionally(new TestException(exceptionMessage));
+
+
+		Future<String> recovered = initialFuture.exceptionally(new ApplyFunction<Throwable, String>() {
+			@Override
+			public String apply(Throwable value) {
+				return value.getMessage();
+			}
+		});
+
+		String actualMessage = recovered.get();
+
+		assertEquals(exceptionMessage, actualMessage);
+	}
+
+	@Test
+	public void testHandle() throws ExecutionException, InterruptedException {
+		int expectedValue = 43;
+		Future<Integer> initialFuture = FlinkCompletableFuture.completed(expectedValue);
+
+		Future<String> result = initialFuture.handle(new BiFunction<Integer, Throwable, String>() {
+			@Override
+			public String apply(Integer integer, Throwable throwable) {
+				if (integer != null) {
+					return String.valueOf(integer);
+				} else {
+					return throwable.getMessage();
+				}
+			}
+		});
+
+		assertEquals(String.valueOf(expectedValue), result.get());
+	}
+
+	@Test
+	public void testCompose() throws ExecutionException, InterruptedException {
+		CompletableFuture<Integer> initialFuture = new FlinkCompletableFuture<>();
+		final int expectedValue = 42;
+
+		Future<Integer> composedFuture = initialFuture.thenCompose(new ApplyFunction<Integer, Future<Integer>>() {
+			@Override
+			public Future<Integer> apply(Integer value) {
+				return FlinkFuture.supplyAsync(new Callable<Integer>() {
+					@Override
+					public Integer call() throws Exception {
+						return expectedValue;
+					}
+				}, executor);
+			}
+		});
+
+		initialFuture.complete(42);
+
+		int actualValue = composedFuture.get();
+
+		assertEquals(expectedValue, actualValue);
+	}
+
+	@Test
+	public void testCombine() throws ExecutionException, InterruptedException {
+		int expectedLeftValue = 1;
+		int expectedRightValue = 2;
+
+		Future<Integer> left = FlinkCompletableFuture.completed(expectedLeftValue);
+		Future<Integer> right = FlinkCompletableFuture.completed(expectedRightValue);
+
+		Future<Integer> sum = left.thenCombine(right, new BiFunction<Integer, Integer, Integer>() {
+			@Override
+			public Integer apply(Integer left, Integer right) {
+				return left + right;
+			}
+		});
+
+		int result = sum.get();
+
+		assertEquals(expectedLeftValue + expectedRightValue, result);
+	}
+
 	private static class TestException extends RuntimeException {
 
 		private static final long serialVersionUID = -1274022962838535130L;