You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/21 09:52:49 UTC
[16/50] [abbrv] flink git commit: [FLINK-4368] [distributed runtime]
Eagerly initialize the RPC endpoint members
[FLINK-4368] [distributed runtime] Eagerly initialize the RPC endpoint members
This closes #2351
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94e00927
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94e00927
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94e00927
Branch: refs/heads/flip-6
Commit: 94e009270386421865f360a3a90b1e5ab9a84c6a
Parents: 04bcb71
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 10 18:27:21 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 21 11:39:12 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/rpc/MainThreadExecutor.java | 9 +-
.../apache/flink/runtime/rpc/RpcEndpoint.java | 156 +++++++++++--------
.../runtime/rpc/akka/AkkaRpcServiceTest.java | 4 +-
3 files changed, 99 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/94e00927/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index e06711e..14b2997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -26,22 +26,23 @@ import java.util.concurrent.TimeoutException;
/**
* Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
- * rpc server.
+ * RPC endpoint.
*
- * This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
+ * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
* implementation which allows to dispatch local procedures to the main thread of the underlying
* rpc server.
*/
public interface MainThreadExecutor {
+
/**
- * Execute the runnable in the main thread of the underlying rpc server.
+ * Execute the runnable in the main thread of the underlying RPC endpoint.
*
* @param runnable Runnable to be executed
*/
void runAsync(Runnable runnable);
/**
- * Execute the callable in the main thread of the underlying rpc server and return a future for
+ * Execute the callable in the main thread of the underlying RPC endpoint and return a future for
* the callable result. If the future is not completed within the given timeout, the returned
* future will throw a {@link TimeoutException}.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/94e00927/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 3d8757f..0d928a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,85 +19,116 @@
package org.apache.flink.runtime.rpc;
import akka.util.Timeout;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import java.util.concurrent.Callable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * Base class for rpc endpoints. Distributed components which offer remote procedure calls have to
- * extend the rpc endpoint base class.
+ * Base class for RPC endpoints. Distributed components which offer remote procedure calls have to
+ * extend the RPC endpoint base class. An RPC endpoint is backed by an {@link RpcService}.
+ *
+ * <h1>Endpoint and Gateway</h1>
+ *
+ * To be done...
+ *
+ * <h1>Single Threaded Endpoint Execution </h1>
+ *
+ * <p>All RPC calls on the same endpoint are called by the same thread
+ * (referred to as the endpoint's <i>main thread</i>).
+ * Thus, by executing all state changing operations within the main
+ * thread, we don't have to reason about concurrent accesses, in the same way in the Actor Model
+ * of Erlang or Akka.
*
- * The main idea is that a rpc endpoint is backed by a rpc server which has a single thread
- * processing the rpc calls. Thus, by executing all state changing operations within the main
- * thread, we don't have to reason about concurrent accesses. The rpc provides provides
- * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the
- * {@link #getMainThreadExecutionContext()} to execute code in the rpc server's main thread.
+ * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)}
+ * and the {@link #getMainThreadExecutionContext()} to execute code in the RPC endoint's main thread.
*
- * @param <C> Rpc gateway counterpart for the implementing rpc endpoint
+ * @param <C> The RPC gateway counterpart for the implementing RPC endpoint
*/
public abstract class RpcEndpoint<C extends RpcGateway> {
protected final Logger log = LoggerFactory.getLogger(getClass());
- /** Rpc service to be used to start the rpc server and to obtain rpc gateways */
+ // ------------------------------------------------------------------------
+
+ /** RPC service to be used to start the RPC server and to obtain rpc gateways */
private final RpcService rpcService;
/** Self gateway which can be used to schedule asynchronous calls on yourself */
- private C self;
+ private final C self;
+
+ /** the fully qualified address of the this RPC endpoint */
+ private final String selfAddress;
+
+ /** The main thread execution context to be used to execute future callbacks in the main thread
+ * of the executing rpc server. */
+ private final MainThreadExecutionContext mainThreadExecutionContext;
+
/**
- * The main thread execution context to be used to execute future callbacks in the main thread
- * of the executing rpc server.
- *
- * IMPORTANT: The main thread context is only available after the rpc server has been started.
+ * Initializes the RPC endpoint.
+ *
+ * @param rpcService The RPC server that dispatches calls to this RPC endpoint.
*/
- private MainThreadExecutionContext mainThreadExecutionContext;
-
public RpcEndpoint(RpcService rpcService) {
- this.rpcService = rpcService;
+ this.rpcService = checkNotNull(rpcService, "rpcService");
+ this.self = rpcService.startServer(this);
+ this.selfAddress = rpcService.getAddress(self);
+ this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
}
+ // ------------------------------------------------------------------------
+ // Shutdown
+ // ------------------------------------------------------------------------
+
/**
- * Get self-gateway which should be used to run asynchronous rpc calls on this endpoint.
- *
- * IMPORTANT: Always issue local method calls via the self-gateway if the current thread
- * is not the main thread of the underlying rpc server, e.g. from within a future callback.
- *
- * @return Self gateway
+ * Shuts down the underlying RPC endpoint via the RPC service.
+ * After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
+ * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread
+ * any more (via {@link #callAsync(Callable, Timeout)} and {@link #runAsync(Runnable)}).
+ *
+ * <p>This method can be overridden to add RPC endpoint specific shut down code.
+ * The overridden method should always call the parent shut down method.
*/
- public C getSelf() {
- return self;
+ public void shutDown() {
+ rpcService.stopServer(self);
}
+ // ------------------------------------------------------------------------
+ // Basic RPC endpoint properties
+ // ------------------------------------------------------------------------
+
/**
- * Execute the runnable in the main thread of the underlying rpc server.
+ * Get self-gateway which should be used to run asynchronous RPC calls on this endpoint.
+ *
+ * <p><b>IMPORTANT</b>: Always issue local method calls via the self-gateway if the current thread
+ * is not the main thread of the underlying rpc server, e.g. from within a future callback.
*
- * @param runnable Runnable to be executed in the main thread of the underlying rpc server
+ * @return The self gateway
*/
- public void runAsync(Runnable runnable) {
- ((MainThreadExecutor) self).runAsync(runnable);
+ public C getSelf() {
+ return self;
}
/**
- * Execute the callable in the main thread of the underlying rpc server returning a future for
- * the result of the callable. If the callable is not completed within the given timeout, then
- * the future will be failed with a {@link java.util.concurrent.TimeoutException}.
+ * Gets the address of the underlying RPC endpoint. The address should be fully qualified so that
+ * a remote system can connect to this RPC endpoint via this address.
*
- * @param callable Callable to be executed in the main thread of the underlying rpc server
- * @param timeout Timeout for the callable to be completed
- * @param <V> Return type of the callable
- * @return Future for the result of the callable.
+ * @return Fully qualified address of the underlying RPC endpoint
*/
- public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
- return ((MainThreadExecutor) self).callAsync(callable, timeout);
+ public String getAddress() {
+ return selfAddress;
}
/**
* Gets the main thread execution context. The main thread execution context can be used to
- * execute tasks in the main thread of the underlying rpc server.
+ * execute tasks in the main thread of the underlying RPC endpoint.
*
* @return Main thread execution context
*/
@@ -106,52 +137,51 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
}
/**
- * Gets the used rpc service.
+ * Gets the endpoint's RPC service.
*
- * @return Rpc service
+ * @return The endpoint's RPC service
*/
public RpcService getRpcService() {
return rpcService;
}
- /**
- * Starts the underlying rpc server via the rpc service and creates the main thread execution
- * context. This makes the rpc endpoint effectively reachable from the outside.
- *
- * Can be overriden to add rpc endpoint specific start up code. Should always call the parent
- * start method.
- */
- public void start() {
- self = rpcService.startServer(this);
- mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
- }
-
+ // ------------------------------------------------------------------------
+ // Asynchronous executions
+ // ------------------------------------------------------------------------
/**
- * Shuts down the underlying rpc server via the rpc service.
+ * Execute the runnable in the main thread of the underlying RPC endpoint.
*
- * Can be overriden to add rpc endpoint specific shut down code. Should always call the parent
- * shut down method.
+ * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint
*/
- public void shutDown() {
- rpcService.stopServer(self);
+ public void runAsync(Runnable runnable) {
+ ((MainThreadExecutor) self).runAsync(runnable);
}
/**
- * Gets the address of the underlying rpc server. The address should be fully qualified so that
- * a remote system can connect to this rpc server via this address.
+ * Execute the callable in the main thread of the underlying RPC service, returning a future for
+ * the result of the callable. If the callable is not completed within the given timeout, then
+ * the future will be failed with a {@link java.util.concurrent.TimeoutException}.
*
- * @return Fully qualified address of the underlying rpc server
+ * @param callable Callable to be executed in the main thread of the underlying rpc server
+ * @param timeout Timeout for the callable to be completed
+ * @param <V> Return type of the callable
+ * @return Future for the result of the callable.
*/
- public String getAddress() {
- return rpcService.getAddress(self);
+ public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+ return ((MainThreadExecutor) self).callAsync(callable, timeout);
}
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
/**
* Execution context which executes runnables in the main thread context. A reported failure
* will cause the underlying rpc server to shut down.
*/
private class MainThreadExecutionContext implements ExecutionContext {
+
private final MainThreadExecutor gateway;
MainThreadExecutionContext(MainThreadExecutor gateway) {
http://git-wip-us.apache.org/repos/asf/flink/blob/94e00927/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index c5bac94..642a380 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -54,15 +54,13 @@ public class AkkaRpcServiceTest extends TestLogger {
ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
- resourceManager.start();
-
ResourceManagerGateway rm = resourceManager.getSelf();
assertTrue(rm instanceof AkkaGateway);
AkkaGateway akkaClient = (AkkaGateway) rm;
- jobMaster.start();
+
jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef()));
// wait for successful registration