You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 18:03:48 UTC

[2/2] flink git commit: [hotfix] [core] Add tests for Futures applying multiple functions

[hotfix] [core] Add tests for Futures applying multiple functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61d7f15d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61d7f15d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61d7f15d

Branch: refs/heads/master
Commit: 61d7f15dc8500bc5350508fcbe47a0873452857b
Parents: f6709b4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 13:14:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 18:50:11 2017 +0100

----------------------------------------------------------------------
 .../runtime/concurrent/FlinkFutureTest.java     | 82 ++++++++++++++++++++
 1 file changed, 82 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61d7f15d/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index 25d010b..0bdc563 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -454,6 +456,86 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(expectedLeftValue + expectedRightValue, result);
 	}
 
+	/**
+	 * Tests that multiple functions can be called on complete futures.
+	 */
+	@Test(timeout = 10000L)
+	public void testMultipleFunctionsOnCompleteFuture() throws Exception {
+		final FlinkCompletableFuture<String> future = FlinkCompletableFuture.completed("test");
+
+		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+
+			@Override
+			public String apply(String s, Throwable throwable) {
+				return s != null ? s : throwable.getMessage();
+			}
+		}, executor);
+
+		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+			@Override
+			public void accept(String value) {}
+		}, executor);
+
+		assertEquals("test", result1.get());
+		assertNull(result2.get());
+	}
+
+	/**
+	 * Tests that multiple functions can be called on incomplete futures.
+	 */
+	@Test(timeout = 10000L)
+	public void testMultipleFunctionsOnIncompleteFuture() throws Exception {
+		final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+			@Override
+			public String apply(String s, Throwable throwable) {
+				return s != null ? s : throwable.getMessage();
+			}
+		}, executor);
+
+		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+			@Override
+			public void accept(String value) {}
+		}, executor);
+
+		future.complete("value");
+
+		assertEquals("value", result1.get());
+		assertNull(result2.get());
+	}
+
+	/**
+	 * Tests that multiple functions can be called on complete futures.
+	 */
+	@Test(timeout = 10000)
+	public void testMultipleFunctionsExceptional() throws Exception {
+		final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+			@Override
+			public String apply(String s, Throwable throwable) {
+				return s != null ? s : throwable.getMessage();
+			}
+		}, executor);
+
+		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+			@Override
+			public void accept(String value) {}
+		}, executor);
+
+		future.completeExceptionally(new TestException("test"));
+
+		assertEquals("test", result1.get());
+
+		try {
+			result2.get();
+			fail("We should have caught an ExecutionException.");
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof TestException);
+		}
+	}
+
 	private static class TestException extends RuntimeException {
 
 		private static final long serialVersionUID = -1274022962838535130L;