You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/28 08:21:24 UTC
[41/50] [abbrv] flink git commit: [FLINK-4658] [rpc] Allow RpcService
to execute Runnables and Callables in its executor
[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in its executor
This closes #2531.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6159f56a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6159f56a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6159f56a
Branch: refs/heads/flip-6
Commit: 6159f56a16d407f4a3bf74cdbc578a705e3e063b
Parents: 103e44c
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 18:16:27 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 19:25:00 2016 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/rpc/RpcService.java | 31 ++++++++++++++
.../flink/runtime/rpc/akka/AkkaRpcService.java | 14 ++++++
.../flink/runtime/rpc/AsyncCallsTest.java | 1 -
.../runtime/rpc/TestingSerialRpcService.java | 16 +++++++
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 45 ++++++++++++++++++++
5 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6159f56a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a367ff2..437e08b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -89,4 +90,34 @@ public interface RpcService {
* @param delay The delay after which the runnable will be executed
*/
void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
+
+ /**
+ * Execute the given runnable in the executor of the RPC service. This method can be used to run
+ * code outside of the main thread of a {@link RpcEndpoint}.
+ *
+ * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
+ * any concurrent invocations and is therefore not suitable to run completion methods of futures
+ * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+ * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+ * {@code RpcEndpoint}.
+ *
+ * @param runnable to execute
+ */
+ void execute(Runnable runnable);
+
+ /**
+ * Execute the given callable and return its result as a {@link Future}. This method can be used
+ * to run code outside of the main thread of a {@link RpcEndpoint}.
+ *
+ * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
+ * any concurrent invocations and is therefore not suitable to run completion methods of futures
+ * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+ * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+ * {@code RpcEndpoint}.
+ *
+ * @param callable to execute
+ * @param <T> is the return value type
+ * @return Future containing the callable's future result
+ */
+ <T> Future<T> execute(Callable<T> callable);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6159f56a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 36f1115..cee19c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -25,6 +25,7 @@ import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
@@ -48,6 +49,7 @@ import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -223,4 +225,16 @@ public class AkkaRpcService implements RpcService {
actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
}
+
+ @Override
+ public void execute(Runnable runnable) {
+ actorSystem.dispatcher().execute(runnable);
+ }
+
+ @Override
+ public <T> Future<T> execute(Callable<T> callable) {
+ scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher());
+
+ return new FlinkFuture<>(scalaFuture);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6159f56a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 7c6b0ee..e8255d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rpc;
import akka.actor.ActorSystem;
-import akka.util.Timeout;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
http://git-wip-us.apache.org/repos/asf/flink/blob/6159f56a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 957453a..c58ea20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -65,6 +65,22 @@ public class TestingSerialRpcService implements RpcService {
}
@Override
+ public void execute(Runnable runnable) {
+ runnable.run();
+ }
+
+ @Override
+ public <T> Future<T> execute(Callable<T> callable) {
+ try {
+ T result = callable.call();
+
+ return FlinkCompletableFuture.completed(result);
+ } catch (Exception e) {
+ return FlinkCompletableFuture.completedExceptionally(e);
+ }
+ }
+
+ @Override
public Executor getExecutor() {
return executorService;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6159f56a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 4e9e518..5550cb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -22,13 +22,18 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Test;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class AkkaRpcServiceTest extends TestLogger {
@@ -70,4 +75,44 @@ public class AkkaRpcServiceTest extends TestLogger {
assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
}
+
+ /**
+ * Tests that the {@link AkkaRpcService} can execute runnables
+ */
+ @Test
+ public void testExecuteRunnable() throws Exception {
+ final OneShotLatch latch = new OneShotLatch();
+
+ akkaRpcService.execute(new Runnable() {
+ @Override
+ public void run() {
+ latch.trigger();
+ }
+ });
+
+ latch.await(30L, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Tests that the {@link AkkaRpcService} can execute callables and returns their result as
+ * a {@link Future}.
+ */
+ @Test
+ public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
+ final OneShotLatch latch = new OneShotLatch();
+ final int expected = 42;
+
+ Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ latch.trigger();
+ return expected;
+ }
+ });
+
+ int actual = result.get(30L, TimeUnit.SECONDS);
+
+ assertEquals(expected, actual);
+ assertTrue(latch.isTriggered());
+ }
}