You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/08 15:28:41 UTC

[19/50] [abbrv] flink git commit: [FLINK-4368] [distributed runtime] Eagerly initialize the RPC endpoint members

[FLINK-4368] [distributed runtime] Eagerly initialize the RPC endpoint members

This closes #2351


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/506aac07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/506aac07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/506aac07

Branch: refs/heads/flip-6
Commit: 506aac073a97494c8685492b18adc65fb2786829
Parents: 0f36fb7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 10 18:27:21 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Sep 8 17:26:56 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/MainThreadExecutor.java   |   9 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 156 +++++++++++--------
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   4 +-
 3 files changed, 99 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/506aac07/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index e06711e..14b2997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -26,22 +26,23 @@ import java.util.concurrent.TimeoutException;
 
 /**
  * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying
- * rpc server.
+ * RPC endpoint.
  *
- * This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
+ * <p>This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint}
  * implementation which allows to dispatch local procedures to the main thread of the underlying
  * rpc server.
  */
 public interface MainThreadExecutor {
+
 	/**
-	 * Execute the runnable in the main thread of the underlying rpc server.
+	 * Execute the runnable in the main thread of the underlying RPC endpoint.
 	 *
 	 * @param runnable Runnable to be executed
 	 */
 	void runAsync(Runnable runnable);
 
 	/**
-	 * Execute the callable in the main thread of the underlying rpc server and return a future for
+	 * Execute the callable in the main thread of the underlying RPC endpoint and return a future for
 	 * the callable result. If the future is not completed within the given timeout, the returned
 	 * future will throw a {@link TimeoutException}.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/506aac07/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 3d8757f..0d928a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,85 +19,116 @@
 package org.apache.flink.runtime.rpc;
 
 import akka.util.Timeout;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
 import java.util.concurrent.Callable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Base class for rpc endpoints. Distributed components which offer remote procedure calls have to
- * extend the rpc endpoint base class.
+ * Base class for RPC endpoints. Distributed components which offer remote procedure calls have to
+ * extend the RPC endpoint base class. An RPC endpoint is backed by an {@link RpcService}. 
+ * 
+ * <h1>Endpoint and Gateway</h1>
+ * 
+ * To be done...
+ * 
+ * <h1>Single Threaded Endpoint Execution </h1>
+ * 
+ * <p>All RPC calls on the same endpoint are called by the same thread
+ * (referred to as the endpoint's <i>main thread</i>).
+ * Thus, by executing all state changing operations within the main 
+ * thread, we don't have to reason about concurrent accesses, in the same way in the Actor Model
+ * of Erlang or Akka.
  *
- * The main idea is that a rpc endpoint is backed by a rpc server which has a single thread
- * processing the rpc calls. Thus, by executing all state changing operations within the main
- * thread, we don't have to reason about concurrent accesses. The rpc provides provides
- * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the
- * {@link #getMainThreadExecutionContext()} to execute code in the rpc server's main thread.
+ * <p>The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)}
+  * and the {@link #getMainThreadExecutionContext()} to execute code in the RPC endoint's main thread.
  *
- * @param <C> Rpc gateway counterpart for the implementing rpc endpoint
+ * @param <C> The RPC gateway counterpart for the implementing RPC endpoint
  */
 public abstract class RpcEndpoint<C extends RpcGateway> {
 
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
-	/** Rpc service to be used to start the rpc server and to obtain rpc gateways */
+	// ------------------------------------------------------------------------
+
+	/** RPC service to be used to start the RPC server and to obtain rpc gateways */
 	private final RpcService rpcService;
 
 	/** Self gateway which can be used to schedule asynchronous calls on yourself */
-	private C self;
+	private final C self;
+
+	/** the fully qualified address of the this RPC endpoint */
+	private final String selfAddress;
+
+	/** The main thread execution context to be used to execute future callbacks in the main thread
+	 * of the executing rpc server. */
+	private final MainThreadExecutionContext mainThreadExecutionContext;
+
 
 	/**
-	 * The main thread execution context to be used to execute future callbacks in the main thread
-	 * of the executing rpc server.
-	 *
-	 * IMPORTANT: The main thread context is only available after the rpc server has been started.
+	 * Initializes the RPC endpoint.
+	 * 
+	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint. 
 	 */
-	private MainThreadExecutionContext mainThreadExecutionContext;
-
 	public RpcEndpoint(RpcService rpcService) {
-		this.rpcService = rpcService;
+		this.rpcService = checkNotNull(rpcService, "rpcService");
+		this.self = rpcService.startServer(this);
+		this.selfAddress = rpcService.getAddress(self);
+		this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Shutdown
+	// ------------------------------------------------------------------------
+
 	/**
-	 * Get self-gateway which should be used to run asynchronous rpc calls on this endpoint.
-	 *
-	 * IMPORTANT: Always issue local method calls via the self-gateway if the current thread
-	 * is not the main thread of the underlying rpc server, e.g. from within a future callback.
-	 *
-	 * @return Self gateway
+	 * Shuts down the underlying RPC endpoint via the RPC service.
+	 * After this method was called, the RPC endpoint will no longer be reachable, neither remotely,
+	 * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread
+	 * any more (via {@link #callAsync(Callable, Timeout)} and {@link #runAsync(Runnable)}).
+	 * 
+	 * <p>This method can be overridden to add RPC endpoint specific shut down code.
+	 * The overridden method should always call the parent shut down method.
 	 */
-	public C getSelf() {
-		return self;
+	public void shutDown() {
+		rpcService.stopServer(self);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Basic RPC endpoint properties
+	// ------------------------------------------------------------------------
+
 	/**
-	 * Execute the runnable in the main thread of the underlying rpc server.
+	 * Get self-gateway which should be used to run asynchronous RPC calls on this endpoint.
+	 *
+	 * <p><b>IMPORTANT</b>: Always issue local method calls via the self-gateway if the current thread
+	 * is not the main thread of the underlying rpc server, e.g. from within a future callback.
 	 *
-	 * @param runnable Runnable to be executed in the main thread of the underlying rpc server
+	 * @return The self gateway
 	 */
-	public void runAsync(Runnable runnable) {
-		((MainThreadExecutor) self).runAsync(runnable);
+	public C getSelf() {
+		return self;
 	}
 
 	/**
-	 * Execute the callable in the main thread of the underlying rpc server returning a future for
-	 * the result of the callable. If the callable is not completed within the given timeout, then
-	 * the future will be failed with a {@link java.util.concurrent.TimeoutException}.
+	 * Gets the address of the underlying RPC endpoint. The address should be fully qualified so that
+	 * a remote system can connect to this RPC endpoint via this address.
 	 *
-	 * @param callable Callable to be executed in the main thread of the underlying rpc server
-	 * @param timeout Timeout for the callable to be completed
-	 * @param <V> Return type of the callable
-	 * @return Future for the result of the callable.
+	 * @return Fully qualified address of the underlying RPC endpoint
 	 */
-	public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
-		return ((MainThreadExecutor) self).callAsync(callable, timeout);
+	public String getAddress() {
+		return selfAddress;
 	}
 
 	/**
 	 * Gets the main thread execution context. The main thread execution context can be used to
-	 * execute tasks in the main thread of the underlying rpc server.
+	 * execute tasks in the main thread of the underlying RPC endpoint.
 	 *
 	 * @return Main thread execution context
 	 */
@@ -106,52 +137,51 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	}
 
 	/**
-	 * Gets the used rpc service.
+	 * Gets the endpoint's RPC service.
 	 *
-	 * @return Rpc service
+	 * @return The endpoint's RPC service
 	 */
 	public RpcService getRpcService() {
 		return rpcService;
 	}
 
-	/**
-	 * Starts the underlying rpc server via the rpc service and creates the main thread execution
-	 * context. This makes the rpc endpoint effectively reachable from the outside.
-	 *
-	 * Can be overriden to add rpc endpoint specific start up code. Should always call the parent
-	 * start method.
-	 */
-	public void start() {
-		self = rpcService.startServer(this);
-		mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
-	}
-
+	// ------------------------------------------------------------------------
+	//  Asynchronous executions
+	// ------------------------------------------------------------------------
 
 	/**
-	 * Shuts down the underlying rpc server via the rpc service.
+	 * Execute the runnable in the main thread of the underlying RPC endpoint.
 	 *
-	 * Can be overriden to add rpc endpoint specific shut down code. Should always call the parent
-	 * shut down method.
+	 * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint
 	 */
-	public void shutDown() {
-		rpcService.stopServer(self);
+	public void runAsync(Runnable runnable) {
+		((MainThreadExecutor) self).runAsync(runnable);
 	}
 
 	/**
-	 * Gets the address of the underlying rpc server. The address should be fully qualified so that
-	 * a remote system can connect to this rpc server via this address.
+	 * Execute the callable in the main thread of the underlying RPC service, returning a future for
+	 * the result of the callable. If the callable is not completed within the given timeout, then
+	 * the future will be failed with a {@link java.util.concurrent.TimeoutException}.
 	 *
-	 * @return Fully qualified address of the underlying rpc server
+	 * @param callable Callable to be executed in the main thread of the underlying rpc server
+	 * @param timeout Timeout for the callable to be completed
+	 * @param <V> Return type of the callable
+	 * @return Future for the result of the callable.
 	 */
-	public String getAddress() {
-		return rpcService.getAddress(self);
+	public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
+		return ((MainThreadExecutor) self).callAsync(callable, timeout);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Execution context which executes runnables in the main thread context. A reported failure
 	 * will cause the underlying rpc server to shut down.
 	 */
 	private class MainThreadExecutionContext implements ExecutionContext {
+
 		private final MainThreadExecutor gateway;
 
 		MainThreadExecutionContext(MainThreadExecutor gateway) {

http://git-wip-us.apache.org/repos/asf/flink/blob/506aac07/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index c5bac94..642a380 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -54,15 +54,13 @@ public class AkkaRpcServiceTest extends TestLogger {
 		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
 		JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
 
-		resourceManager.start();
-
 		ResourceManagerGateway rm = resourceManager.getSelf();
 
 		assertTrue(rm instanceof AkkaGateway);
 
 		AkkaGateway akkaClient = (AkkaGateway) rm;
 
-		jobMaster.start();
+		
 		jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef()));
 
 		// wait for successful registration