You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/08/09 13:07:08 UTC

flink git commit: Add MainThreadExecutionContext to RpcServer

Repository: flink
Updated Branches:
  refs/heads/flip-6 abd173521 -> aecc6a28a


Add MainThreadExecutionContext to RpcServer

In order to simplify the execution of future callbacks in the main thread,
the RpcServer has now the method getMainThreadExecutionContext which can be
called after the RpcServer has been started. With this context, the future
callback will be executed by the main thread.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aecc6a28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aecc6a28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aecc6a28

Branch: refs/heads/flip-6
Commit: aecc6a28a95d8a1df492e2096fdaa37eb125983b
Parents: abd1735
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Aug 9 15:06:28 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 9 15:06:28 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/rpc/RpcServer.java | 41 ++++++++++++++
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 58 ++++++--------------
 .../rpc/resourcemanager/ResourceManager.java    | 42 ++++----------
 3 files changed, 69 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aecc6a28/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
index 042564d..7df440f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.rpc;
 
 import akka.util.Timeout;
 import org.apache.flink.runtime.rpc.akka.RunnableAkkaGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
 import java.util.concurrent.Callable;
@@ -30,8 +33,12 @@ import java.util.concurrent.Callable;
  * @param <C> Rpc gateway counter part matching the RpcServer
  */
 public abstract class RpcServer<C extends RpcGateway> {
+
+	protected final Logger log = LoggerFactory.getLogger(getClass());
+
 	private final RpcService rpcService;
 	private C self;
+	private MainThreadExecutionContext mainThreadExecutionContext;
 
 	public RpcServer(RpcService rpcService) {
 		this.rpcService = rpcService;
@@ -57,12 +64,17 @@ public abstract class RpcServer<C extends RpcGateway> {
 		return ((RunnableAkkaGateway) self).callAsync(callable, timeout);
 	}
 
+	public ExecutionContext getMainThreadExecutionContext() {
+		return mainThreadExecutionContext;
+	}
+
 	public RpcService getRpcService() {
 		return rpcService;
 	}
 
 	public void start() {
 		self = rpcService.startServer(this);
+		mainThreadExecutionContext = new MainThreadExecutionContext((RunnableAkkaGateway) self);
 	}
 
 	public void shutDown() {
@@ -72,4 +84,33 @@ public abstract class RpcServer<C extends RpcGateway> {
 	public String getAddress() {
 		return rpcService.getAddress(self);
 	}
+
+	public class MainThreadExecutionContext implements ExecutionContext {
+		private final RunnableAkkaGateway gateway;
+
+		public MainThreadExecutionContext(RunnableAkkaGateway gateway) {
+			this.gateway = gateway;
+		}
+
+		@Override
+		public void execute(Runnable runnable) {
+			gateway.runAsync(runnable);
+		}
+
+		@Override
+		public void reportFailure(final Throwable t) {
+			gateway.runAsync(new Runnable() {
+				@Override
+				public void run() {
+					log.error("Encountered failure in the main thread execution context.", t);
+					shutDown();
+				}
+			});
+		}
+
+		@Override
+		public ExecutionContext prepare() {
+			return this;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aecc6a28/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index dc1fc6d..2ac1a59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc.jobmaster;
 
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -41,7 +42,6 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -56,8 +56,6 @@ public class JobMaster extends RpcServer<JobMasterGateway> {
 	private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS);
 	private final long failedRegistrationDelay = 10000;
 
-	private ScheduledFuture<?> scheduledRegistration;
-
 	private ResourceManagerGateway resourceManager = null;
 
 	private UUID currentRegistrationRun;
@@ -111,21 +109,16 @@ public class JobMaster extends RpcServer<JobMasterGateway> {
 			if (deadline.isOverdue()) {
 				// we've exceeded our registration deadline. This means that we have to shutdown the JobMaster
 				LOG.error("Exceeded registration deadline without successfully registering at the ResourceManager.");
-				runAsync(new Runnable() {
-					@Override
-					public void run() {
-						shutDown();
-					}
-				});
+				shutDown();
 			} else {
-				Future<RegistrationResponse> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<RegistrationResponse>>() {
+				Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
 					@Override
-					public Future<RegistrationResponse> apply(ResourceManagerGateway resourceManagerGateway) {
-						return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout);
+					public Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> apply(ResourceManagerGateway resourceManagerGateway) {
+						return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway));
 					}
 				}, executionContext);
 
-				registrationResponseFuture.zip(resourceManagerFuture).onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
+				registrationResponseFuture.onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
 					@Override
 					public void onComplete(Throwable failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws Throwable {
 						if (failure != null) {
@@ -134,41 +127,24 @@ public class JobMaster extends RpcServer<JobMasterGateway> {
 								// so increase it and try again.
 								final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
 
-								// we have to execute handleResourceManagerRegistration in the main thread
-								// because we need consistency wrt currentRegistrationRun
-								runAsync(new Runnable() {
-									@Override
-									public void run() {
-										handleResourceManagerRegistration(
-											jobMasterRegistration,
-											attemptNumber + 1,
-											resourceManagerFuture,
-											registrationRun,
-											newTimeout,
-											maxTimeout,
-											deadline);
-									}
-								});
+								handleResourceManagerRegistration(
+									jobMasterRegistration,
+									attemptNumber + 1,
+									resourceManagerFuture,
+									registrationRun,
+									newTimeout,
+									maxTimeout,
+									deadline);
 							} else {
 								LOG.error("Received unknown error while registering at the ResourceManager.", failure);
-								runAsync(new Runnable() {
-									@Override
-									public void run() {
-										shutDown();
-									}
-								});
+								shutDown();
 							}
 						} else {
 							final RegistrationResponse response = tuple._1();
 							final ResourceManagerGateway gateway = tuple._2();
 
 							if (response.isSuccess()) {
-								runAsync(new Runnable() {
-									@Override
-									public void run() {
-										finishResourceManagerRegistration(gateway, response.getInstanceID());
-									}
-								});
+								finishResourceManagerRegistration(gateway, response.getInstanceID());
 							} else {
 								LOG.info("The registration was refused. Try again.");
 
@@ -196,7 +172,7 @@ public class JobMaster extends RpcServer<JobMasterGateway> {
 							}
 						}
 					}
-				}, executionContext);
+				}, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread
 			}
 		} else {
 			LOG.info("Discard out-dated registration run.");

http://git-wip-us.apache.org/repos/asf/flink/blob/aecc6a28/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 498c3fc..1e2f474 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -19,8 +19,6 @@
 package org.apache.flink.runtime.rpc.resourcemanager;
 
 import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
-import akka.util.Timeout;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcServer;
@@ -32,14 +30,11 @@ import scala.concurrent.Future;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 
 public class ResourceManager extends RpcServer<ResourceManagerGateway> {
 	private final ExecutionContext executionContext;
 	private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
-	private final Timeout callableTimeout = new Timeout(10, TimeUnit.SECONDS);
 
 	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
 		super(rpcService);
@@ -51,36 +46,21 @@ public class ResourceManager extends RpcServer<ResourceManagerGateway> {
 	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
 		Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class);
 
-		return jobMasterFuture.flatMap(new Mapper<JobMasterGateway, Future<RegistrationResponse>>() {
+		return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() {
 			@Override
-			public Future<RegistrationResponse> apply(final JobMasterGateway jobMasterGateway) {
-				Future<InstanceID> instanceIDFuture = callAsync(new Callable<InstanceID> () {
-					@Override
-					public InstanceID call() throws Exception {
-						if (jobMasterGateways.containsKey(jobMasterGateway)) {
-							return jobMasterGateways.get(jobMasterGateway);
-						} else {
-							InstanceID instanceID = new InstanceID();
-							jobMasterGateways.put(jobMasterGateway, instanceID);
+			public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) {
+				InstanceID instanceID;
 
-							return instanceID;
-						}
-					}
-				}, callableTimeout);
+				if (jobMasterGateways.containsKey(jobMasterGateway)) {
+					instanceID = jobMasterGateways.get(jobMasterGateway);
+				} else {
+					instanceID = new InstanceID();
+					jobMasterGateways.put(jobMasterGateway, instanceID);
+				}
 
-				return instanceIDFuture.map(new Mapper<InstanceID, RegistrationResponse>() {
-					@Override
-					public RegistrationResponse apply(InstanceID parameter) {
-						return new RegistrationResponse(true, parameter);
-					}
-				}, executionContext).recover(new Recover<RegistrationResponse>() {
-					@Override
-					public RegistrationResponse recover(Throwable failure) throws Throwable {
-						return new RegistrationResponse(false, null);
-					}
-				}, executionContext);
+				return new RegistrationResponse(true, instanceID);
 			}
-		}, executionContext);
+		}, getMainThreadExecutionContext());
 	}
 
 	@RpcMethod