You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 18:03:47 UTC

[1/2] flink git commit: [hotfix] [core] Improve FlinkFuture synchronous actions by avoiding creation of ExecutionContext

Repository: flink
Updated Branches:
  refs/heads/master f6709b4a4 -> ff3786663


[hotfix] [core] Improve FlinkFuture synchronous actions by avoiding creation of ExecutionContext


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff378666
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff378666
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff378666

Branch: refs/heads/master
Commit: ff3786663b7f1f8a09b5ad0666f55fb171d7f64c
Parents: 61d7f15
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 13:16:13 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 18:50:11 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/concurrent/impl/FlinkFuture.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff378666/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index dd7e8de..ab23fc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -24,6 +24,7 @@ import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import akka.dispatch.Recover;
 import akka.japi.Procedure;
+
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
@@ -31,8 +32,10 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -59,6 +62,12 @@ public class FlinkFuture<T> implements Future<T> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class);
 
+	private static final Executor DIRECT_EXECUTOR = Executors.directExecutor();
+
+	private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = executionContextFromExecutor(DIRECT_EXECUTOR);
+
+	// ------------------------------------------------------------------------
+
 	protected scala.concurrent.Future<T> scalaFuture;
 
 	FlinkFuture() {
@@ -346,6 +355,14 @@ public class FlinkFuture<T> implements Future<T> {
 	//-----------------------------------------------------------------------------------
 
 	private static ExecutionContext createExecutionContext(final Executor executor) {
+		if (executor == DIRECT_EXECUTOR) {
+			return DIRECT_EXECUTION_CONTEXT;
+		} else {
+			return executionContextFromExecutor(executor);
+		}
+	}
+
+	private static ExecutionContext executionContextFromExecutor(final Executor executor) {
 		return ExecutionContexts$.MODULE$.fromExecutor(executor, new Procedure<Throwable>() {
 			@Override
 			public void apply(Throwable throwable) throws Exception {


[2/2] flink git commit: [hotfix] [core] Add tests for Futures applying multiple functions

Posted by se...@apache.org.
[hotfix] [core] Add tests for Futures applying multiple functions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61d7f15d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61d7f15d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61d7f15d

Branch: refs/heads/master
Commit: 61d7f15dc8500bc5350508fcbe47a0873452857b
Parents: f6709b4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 13:14:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 18:50:11 2017 +0100

----------------------------------------------------------------------
 .../runtime/concurrent/FlinkFutureTest.java     | 82 ++++++++++++++++++++
 1 file changed, 82 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61d7f15d/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index 25d010b..0bdc563 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -454,6 +456,86 @@ public class FlinkFutureTest extends TestLogger {
 		assertEquals(expectedLeftValue + expectedRightValue, result);
 	}
 
+	/**
+	 * Tests that multiple functions can be called on complete futures.
+	 */
+	@Test(timeout = 10000L)
+	public void testMultipleFunctionsOnCompleteFuture() throws Exception {
+		final FlinkCompletableFuture<String> future = FlinkCompletableFuture.completed("test");
+
+		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+
+			@Override
+			public String apply(String s, Throwable throwable) {
+				return s != null ? s : throwable.getMessage();
+			}
+		}, executor);
+
+		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+			@Override
+			public void accept(String value) {}
+		}, executor);
+
+		assertEquals("test", result1.get());
+		assertNull(result2.get());
+	}
+
+	/**
+	 * Tests that multiple functions can be called on incomplete futures.
+	 */
+	@Test(timeout = 10000L)
+	public void testMultipleFunctionsOnIncompleteFuture() throws Exception {
+		final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+			@Override
+			public String apply(String s, Throwable throwable) {
+				return s != null ? s : throwable.getMessage();
+			}
+		}, executor);
+
+		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+			@Override
+			public void accept(String value) {}
+		}, executor);
+
+		future.complete("value");
+
+		assertEquals("value", result1.get());
+		assertNull(result2.get());
+	}
+
+	/**
+	 * Tests that multiple functions can be called on complete futures.
+	 */
+	@Test(timeout = 10000)
+	public void testMultipleFunctionsExceptional() throws Exception {
+		final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+		Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+			@Override
+			public String apply(String s, Throwable throwable) {
+				return s != null ? s : throwable.getMessage();
+			}
+		}, executor);
+
+		Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+			@Override
+			public void accept(String value) {}
+		}, executor);
+
+		future.completeExceptionally(new TestException("test"));
+
+		assertEquals("test", result1.get());
+
+		try {
+			result2.get();
+			fail("We should have caught an ExecutionException.");
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof TestException);
+		}
+	}
+
 	private static class TestException extends RuntimeException {
 
 		private static final long serialVersionUID = -1274022962838535130L;