You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:38 UTC

[33/50] [abbrv] flink git commit: [FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in its executor

[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in its executor

This closes #2531.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b22d661
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b22d661
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b22d661

Branch: refs/heads/flip-6
Commit: 9b22d6610650cbc14e540b067467a80648ecd982
Parents: 88a05f5
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Sep 21 18:16:27 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:45 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcService.java    | 31 ++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 14 ++++++
 .../flink/runtime/rpc/AsyncCallsTest.java       |  1 -
 .../runtime/rpc/TestingSerialRpcService.java    | 16 +++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    | 45 ++++++++++++++++++++
 5 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b22d661/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a367ff2..437e08b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -89,4 +90,34 @@ public interface RpcService {
 	 * @param delay    The delay after which the runnable will be executed
 	 */
 	void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
+
+	/**
+	 * Execute the given runnable in the executor of the RPC service. This method can be used to run
+	 * code outside of the main thread of a {@link RpcEndpoint}.
+	 *
+	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
+	 * any concurrent invocations and is therefore not suitable to run completion methods of futures
+	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+	 * {@code RpcEndpoint}.
+	 *
+	 * @param runnable to execute
+	 */
+	void execute(Runnable runnable);
+
+	/**
+	 * Execute the given callable and return its result as a {@link Future}. This method can be used
+	 * to run code outside of the main thread of a {@link RpcEndpoint}.
+	 *
+	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against
+	 * any concurrent invocations and is therefore not suitable to run completion methods of futures
+	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+	 * {@code RpcEndpoint}.
+	 *
+	 * @param callable to execute
+	 * @param <T> is the return value type
+	 * @return Future containing the callable's future result
+	 */
+	<T> Future<T> execute(Callable<T> callable);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b22d661/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 36f1115..cee19c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -25,6 +25,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 
 import akka.pattern.Patterns;
@@ -48,6 +49,7 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -223,4 +225,16 @@ public class AkkaRpcService implements RpcService {
 
 		actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher());
 	}
+
+	@Override
+	public void execute(Runnable runnable) {
+		actorSystem.dispatcher().execute(runnable);
+	}
+
+	@Override
+	public <T> Future<T> execute(Callable<T> callable) {
+		scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher());
+
+		return new FlinkFuture<>(scalaFuture);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b22d661/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 7c6b0ee..e8255d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rpc;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;

http://git-wip-us.apache.org/repos/asf/flink/blob/9b22d661/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 957453a..c58ea20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -65,6 +65,22 @@ public class TestingSerialRpcService implements RpcService {
 	}
 
 	@Override
+	public void execute(Runnable runnable) {
+		runnable.run();
+	}
+
+	@Override
+	public <T> Future<T> execute(Callable<T> callable) {
+		try {
+			T result = callable.call();
+
+			return FlinkCompletableFuture.completed(result);
+		} catch (Exception e) {
+			return FlinkCompletableFuture.completedExceptionally(e);
+		}
+	}
+
+	@Override
 	public Executor getExecutor() {
 		return executorService;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b22d661/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 4e9e518..5550cb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -22,13 +22,18 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {
@@ -70,4 +75,44 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 		assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay);
 	}
+
+	/**
+	 * Tests that the {@link AkkaRpcService} can execute runnables
+	 */
+	@Test
+	public void testExecuteRunnable() throws Exception {
+		final OneShotLatch latch = new OneShotLatch();
+
+		akkaRpcService.execute(new Runnable() {
+			@Override
+			public void run() {
+				latch.trigger();
+			}
+		});
+
+		latch.await(30L, TimeUnit.SECONDS);
+	}
+
+	/**
+	 * Tests that the {@link AkkaRpcService} can execute callables and returns their result as
+	 * a {@link Future}.
+	 */
+	@Test
+	public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
+		final OneShotLatch latch = new OneShotLatch();
+		final int expected = 42;
+
+		Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() {
+			@Override
+			public Integer call() throws Exception {
+				latch.trigger();
+				return expected;
+			}
+		});
+
+		int actual = result.get(30L, TimeUnit.SECONDS);
+
+		assertEquals(expected, actual);
+		assertTrue(latch.isTriggered());
+	}
 }