You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:30 UTC

[11/52] [abbrv] flink git commit: [FLINK-4351] [cluster management] JobManager handle TaskManager's registration

[FLINK-4351] [cluster management] JobManager handle TaskManager's registration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc682361
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc682361
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc682361

Branch: refs/heads/master
Commit: bc6823618a1cf01a3385478bb1913940d9f604bc
Parents: 5cbec02
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 23:00:57 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 75 +++++++++++++++++---
 .../runtime/jobmaster/JobMasterGateway.java     | 15 ++--
 .../runtime/taskexecutor/JobLeaderService.java  | 55 ++++++--------
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../taskexecutor/TaskManagerServices.java       |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 10 ++-
 6 files changed, 102 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e6720fd..1fb5474 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -36,7 +37,9 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
@@ -89,8 +94,10 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +129,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Configuration of the JobManager */
 	private final Configuration configuration;
 
+	private final Time rpcTimeout;
+
 	/** Service to contend for and retrieve the leadership of JM and RM */
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -152,7 +161,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private volatile UUID leaderSessionID;
 
-	// --------- resource manager --------
+	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
@@ -160,6 +169,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
 	private ResourceManagerConnection resourceManagerConnection;
 
+	// --------- TaskManagers --------
+
+	private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
 
 	// ------------------------------------------------------------------------
 
@@ -181,6 +193,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		this.jobGraph = checkNotNull(jobGraph);
 		this.configuration = checkNotNull(configuration);
+		this.rpcTimeout = rpcAskTimeout;
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
 		this.executionContext = checkNotNull(executorService);
@@ -244,6 +257,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		this.slotPool = new SlotPool(executorService);
 		this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+
+		this.registeredTaskManagers = new HashMap<>(4);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -380,8 +395,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 		closeResourceManagerConnection();
 
-		// TODO: disconnect from all registered task managers
-
+		for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
+			slotPool.releaseResource(taskManagerId);
+		}
+		registeredTaskManagers.clear();
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -656,11 +673,53 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public RegistrationResponse registerTaskManager(
-		final String taskManagerAddress,
-		final ResourceID taskManagerProcessId,
-		final UUID leaderId) {
-		throw new UnsupportedOperationException("Has to be implemented.");
+	public Future<RegistrationResponse> registerTaskManager(
+			final TaskManagerLocation taskManagerLocation,
+			final UUID leaderId) throws Exception
+	{
+		if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+							"leader session ID {} did not equal the received leader session ID {}.",
+					taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+					JobMaster.this.leaderSessionID, leaderId);
+			throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID
+					+ ", actual: " + leaderId);
+		}
+
+		final ResourceID taskManagerId = taskManagerLocation.getResourceID();
+
+		if (registeredTaskManagers.containsKey(taskManagerId)) {
+			final RegistrationResponse response = new JMTMRegistrationSuccess(
+					taskManagerId, libraryCacheManager.getBlobServerPort());
+			return FlinkCompletableFuture.completed(response);
+		} else {
+			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
+				@Override
+				public TaskExecutorGateway call() throws Exception {
+					return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class)
+							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
+				}
+			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+				@Override
+				public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					}
+
+					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+										"leader session ID {} did not equal the received leader session ID {}.",
+								taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+								JobMaster.this.leaderSessionID, leaderId);
+						return new RegistrationResponse.Decline("Invalid leader session id");
+					}
+
+					slotPool.registerResource(taskManagerId);
+					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
+					return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
+				}
+			}, getMainThreadExecutor());
+		}
 	}
 
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 17b4194..508e70a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.UUID;
 
@@ -184,15 +185,13 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	/**
 	 * Register the task manager at the job manager.
 	 *
-	 * @param taskManagerAddress address of the task manager
-	 * @param taskManagerProcessId identifying the task manager
-	 * @param leaderId identifying the job leader
-	 * @param timeout for the rpc call
+	 * @param taskManagerLocation location of the task manager
+	 * @param leaderId            identifying the job leader
+	 * @param timeout             for the rpc call
 	 * @return Future registration response indicating whether the registration was successful or not
 	 */
 	Future<RegistrationResponse> registerTaskManager(
-		final String taskManagerAddress,
-		final ResourceID taskManagerProcessId,
-		final UUID leaderId,
-		@RpcTimeout final Time timeout);
+			final TaskManagerLocation taskManagerLocation,
+			final UUID leaderId,
+			@RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index e7f52e2..14d36ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +53,8 @@ public class JobLeaderService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
 
-	/** Process id of the owning process */
-	private final ResourceID ownerProcessId;
+	/** Self's location, used for the job manager connection */
+	private final TaskManagerLocation ownLocation;
 
 	/** The leader retrieval service and listener for each registered job */
 	private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
@@ -62,9 +62,6 @@ public class JobLeaderService {
 	/** Internal state of the service */
 	private volatile JobLeaderService.State state;
 
-	/** Address of the owner of this service. This address is used for the job manager connection */
-	private String ownerAddress;
-
 	/** Rpc service to use for establishing connections */
 	private RpcService rpcService;
 
@@ -74,14 +71,13 @@ public class JobLeaderService {
 	/** Job leader listener listening for job leader changes */
 	private JobLeaderListener jobLeaderListener;
 
-	public JobLeaderService(ResourceID ownerProcessId) {
-		this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId);
+	public JobLeaderService(TaskManagerLocation location) {
+		this.ownLocation = Preconditions.checkNotNull(location);
 
 		jobLeaderServices = new HashMap<>(4);
 
 		state = JobLeaderService.State.CREATED;
 
-		ownerAddress = null;
 		rpcService = null;
 		highAvailabilityServices = null;
 		jobLeaderListener = null;
@@ -94,13 +90,11 @@ public class JobLeaderService {
 	/**
 	 * Start the job leader service with the given services.
 	 *
-	 * @param initialOwnerAddress to be used for establishing connections (source address)
 	 * @param initialRpcService to be used to create rpc connections
 	 * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs
 	 * @param initialJobLeaderListener listening for job leader changes
 	 */
 	public void start(
-		final String initialOwnerAddress,
 		final RpcService initialRpcService,
 		final HighAvailabilityServices initialHighAvailabilityServices,
 		final JobLeaderListener initialJobLeaderListener) {
@@ -110,7 +104,6 @@ public class JobLeaderService {
 		} else {
 			LOG.info("Start job leader service.");
 
-			this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
 			this.rpcService = Preconditions.checkNotNull(initialRpcService);
 			this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
 			this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
@@ -311,14 +304,13 @@ public class JobLeaderService {
 			@Override
 			protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
 				return new JobLeaderService.JobManagerRetryingRegistration(
-					LOG,
-					rpcService,
-					"JobManager",
-					JobMasterGateway.class,
-					getTargetAddress(),
-					getTargetLeaderId(),
-					ownerAddress,
-					ownerProcessId);
+						LOG,
+						rpcService,
+						"JobManager",
+						JobMasterGateway.class,
+						getTargetAddress(),
+						getTargetLeaderId(),
+						ownLocation);
 			}
 
 			@Override
@@ -349,10 +341,11 @@ public class JobLeaderService {
 	/**
 	 * Retrying registration for the job manager <--> task manager connection.
 	 */
-	private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
+	private static final class JobManagerRetryingRegistration
+			extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess>
+	{
 
-		private final String taskManagerAddress;
-		private final ResourceID taskManagerProcessId;
+		private final TaskManagerLocation taskManagerLocation;
 
 		JobManagerRetryingRegistration(
 			Logger log,
@@ -361,22 +354,18 @@ public class JobLeaderService {
 			Class<JobMasterGateway> targetType,
 			String targetAddress,
 			UUID leaderId,
-			String taskManagerAddress,
-			ResourceID taskManagerProcessId) {
+			TaskManagerLocation taskManagerLocation) {
 
 			super(log, rpcService, targetName, targetType, targetAddress, leaderId);
 
-			this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
-			this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId);
+			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		}
 
 		@Override
-		protected Future<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
-			return gateway.registerTaskManager(
-				taskManagerAddress,
-				taskManagerProcessId,
-				leaderId,
-				Time.milliseconds(timeoutMillis));
+		protected Future<RegistrationResponse> invokeRegistration(
+				JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
+		{
+			return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5146e5b..1b1c02b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -208,7 +208,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		taskSlotTable.start(new SlotActionsImpl());
 
 		// start the job leader service
-		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
+		jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 54f2332..b57fafe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -210,7 +210,7 @@ public class TaskManagerServices {
 
 		final JobManagerTable jobManagerTable = new JobManagerTable();
 
-		final JobLeaderService jobLeaderService = new JobLeaderService(resourceID);
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		
 		return new TaskManagerServices(
 			taskManagerLocation,

http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 8d6cba2..a8da4fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -365,7 +365,7 @@ public class TaskExecutorTest extends TestLogger {
 		final TimerService<AllocationID> timerService = mock(TimerService.class);
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
 		final JobManagerTable jobManagerTable = new JobManagerTable();
-		final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
@@ -394,8 +394,7 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			any(String.class),
-			eq(resourceId),
+			eq(taskManagerLocation),
 			eq(jobManagerLeaderId),
 			any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
@@ -466,7 +465,7 @@ public class TaskExecutorTest extends TestLogger {
 		final TimerService<AllocationID> timerService = mock(TimerService.class);
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
 		final JobManagerTable jobManagerTable = new JobManagerTable();
-		final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final String resourceManagerAddress = "rm";
@@ -499,8 +498,7 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			any(String.class),
-			eq(resourceId),
+			eq(taskManagerLocation),
 			eq(jobManagerLeaderId),
 			any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));