You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/10 18:03:47 UTC
[1/2] flink git commit: [hotfix] [core] Improve FlinkFuture
synchronous actions by avoiding creation of ExecutionContext
Repository: flink
Updated Branches:
refs/heads/master f6709b4a4 -> ff3786663
[hotfix] [core] Improve FlinkFuture synchronous actions by avoiding creation of ExecutionContext
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff378666
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff378666
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff378666
Branch: refs/heads/master
Commit: ff3786663b7f1f8a09b5ad0666f55fb171d7f64c
Parents: 61d7f15
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 13:16:13 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 18:50:11 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/concurrent/impl/FlinkFuture.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff378666/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index dd7e8de..ab23fc5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -24,6 +24,7 @@ import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import akka.dispatch.Recover;
import akka.japi.Procedure;
+
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.CompletableFuture;
@@ -31,8 +32,10 @@ import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
@@ -59,6 +62,12 @@ public class FlinkFuture<T> implements Future<T> {
private static final Logger LOG = LoggerFactory.getLogger(FlinkFuture.class);
+ private static final Executor DIRECT_EXECUTOR = Executors.directExecutor();
+
+ private static final ExecutionContext DIRECT_EXECUTION_CONTEXT = executionContextFromExecutor(DIRECT_EXECUTOR);
+
+ // ------------------------------------------------------------------------
+
protected scala.concurrent.Future<T> scalaFuture;
FlinkFuture() {
@@ -346,6 +355,14 @@ public class FlinkFuture<T> implements Future<T> {
//-----------------------------------------------------------------------------------
private static ExecutionContext createExecutionContext(final Executor executor) {
+ if (executor == DIRECT_EXECUTOR) {
+ return DIRECT_EXECUTION_CONTEXT;
+ } else {
+ return executionContextFromExecutor(executor);
+ }
+ }
+
+ private static ExecutionContext executionContextFromExecutor(final Executor executor) {
return ExecutionContexts$.MODULE$.fromExecutor(executor, new Procedure<Throwable>() {
@Override
public void apply(Throwable throwable) throws Exception {
[2/2] flink git commit: [hotfix] [core] Add tests for Futures
applying multiple functions
Posted by se...@apache.org.
[hotfix] [core] Add tests for Futures applying multiple functions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61d7f15d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61d7f15d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61d7f15d
Branch: refs/heads/master
Commit: 61d7f15dc8500bc5350508fcbe47a0873452857b
Parents: f6709b4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Feb 10 13:14:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Feb 10 18:50:11 2017 +0100
----------------------------------------------------------------------
.../runtime/concurrent/FlinkFutureTest.java | 82 ++++++++++++++++++++
1 file changed, 82 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/61d7f15d/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
index 25d010b..0bdc563 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FlinkFutureTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.concurrent;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -454,6 +456,86 @@ public class FlinkFutureTest extends TestLogger {
assertEquals(expectedLeftValue + expectedRightValue, result);
}
+ /**
+ * Tests that multiple functions can be called on complete futures.
+ */
+ @Test(timeout = 10000L)
+ public void testMultipleFunctionsOnCompleteFuture() throws Exception {
+ final FlinkCompletableFuture<String> future = FlinkCompletableFuture.completed("test");
+
+ Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+
+ @Override
+ public String apply(String s, Throwable throwable) {
+ return s != null ? s : throwable.getMessage();
+ }
+ }, executor);
+
+ Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+ @Override
+ public void accept(String value) {}
+ }, executor);
+
+ assertEquals("test", result1.get());
+ assertNull(result2.get());
+ }
+
+ /**
+ * Tests that multiple functions can be called on incomplete futures.
+ */
+ @Test(timeout = 10000L)
+ public void testMultipleFunctionsOnIncompleteFuture() throws Exception {
+ final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+ Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+ @Override
+ public String apply(String s, Throwable throwable) {
+ return s != null ? s : throwable.getMessage();
+ }
+ }, executor);
+
+ Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+ @Override
+ public void accept(String value) {}
+ }, executor);
+
+ future.complete("value");
+
+ assertEquals("value", result1.get());
+ assertNull(result2.get());
+ }
+
+ /**
+ * Tests that multiple functions can be called on complete futures.
+ */
+ @Test(timeout = 10000)
+ public void testMultipleFunctionsExceptional() throws Exception {
+ final FlinkCompletableFuture<String> future = new FlinkCompletableFuture<>();
+
+ Future<String> result1 = future.handleAsync(new BiFunction<String, Throwable, String>() {
+ @Override
+ public String apply(String s, Throwable throwable) {
+ return s != null ? s : throwable.getMessage();
+ }
+ }, executor);
+
+ Future<Void> result2 = future.thenAcceptAsync(new AcceptFunction<String>() {
+ @Override
+ public void accept(String value) {}
+ }, executor);
+
+ future.completeExceptionally(new TestException("test"));
+
+ assertEquals("test", result1.get());
+
+ try {
+ result2.get();
+ fail("We should have caught an ExecutionException.");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof TestException);
+ }
+ }
+
private static class TestException extends RuntimeException {
private static final long serialVersionUID = -1274022962838535130L;