You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 18:03:48 UTC
[2/2] flink git commit: [hotfix] [core] Add tests for Futures
applying multiple functions
[hotfix] [core] Add tests for Futures applying multiple functions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61d7f15d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61d7f15d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61d7f15d
Branch: refs/heads/master
Commit: 61d7f15dc8500bc5350508fcbe47a0873452857b
Parents: f6709b4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 13:14:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 18:50:11 2017 +0100
----------------------------------------------------------------------
.../runtime/concurrent/FlinkFutureTest.java | 82 ++++++++++++++++++++
1 file changed, 82 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61d7f15d/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index 25d010b..0bdc563 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -454,6 +456,86 @@ public class FlinkFutureTest extends TestLogger {
assertEquals(expectedLeftValue + expectedRightValue, result);
}
+ /**
+ * Tests that multiple functions can be called on complete futures.
+ */
+ @Test(timeout = 10000L)
+ public void testMultipleFunctionsOnCompleteFuture() throws Exception {
+ final FlinkCompletableFuture<String> future = FlinkCompletableFuture.completed("test");
+
+ Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+
+ @Override
+ public String apply(String s, Throwable throwable) {
+ return s != null ? s : throwable.getMessage();
+ }
+ }, executor);
+
+ Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+ @Override
+ public void accept(String value) {}
+ }, executor);
+
+ assertEquals("test", result1.get());
+ assertNull(result2.get());
+ }
+
+ /**
+ * Tests that multiple functions can be called on incomplete futures.
+ */
+ @Test(timeout = 10000L)
+ public void testMultipleFunctionsOnIncompleteFuture() throws Exception {
+ final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+ Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+ @Override
+ public String apply(String s, Throwable throwable) {
+ return s != null ? s : throwable.getMessage();
+ }
+ }, executor);
+
+ Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+ @Override
+ public void accept(String value) {}
+ }, executor);
+
+ future.complete("value");
+
+ assertEquals("value", result1.get());
+ assertNull(result2.get());
+ }
+
+ /**
+ * Tests that multiple functions can be called on complete futures.
+ */
+ @Test(timeout = 10000)
+ public void testMultipleFunctionsExceptional() throws Exception {
+ final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+ Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+ @Override
+ public String apply(String s, Throwable throwable) {
+ return s != null ? s : throwable.getMessage();
+ }
+ }, executor);
+
+ Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+ @Override
+ public void accept(String value) {}
+ }, executor);
+
+ future.completeExceptionally(new TestException("test"));
+
+ assertEquals("test", result1.get());
+
+ try {
+ result2.get();
+ fail("We should have caught an ExecutionException.");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TestException);
+ }
+ }
+
private static class TestException extends RuntimeException {
private static final long serialVersionUID = -1274022962838535130L;