You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:29 UTC

[10/52] [abbrv] flink git commit: [hotfix] Treat taskManager's rpc address and location separately

[hotfix] Treat taskManager's rpc address and location separately


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7ed9a5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7ed9a5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7ed9a5e

Branch: refs/heads/master
Commit: a7ed9a5e3876c538deae147217c5443a287e98d5
Parents: bc68236
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Oct 17 09:38:46 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  7 ++--
 .../runtime/jobmaster/JobMasterGateway.java     |  8 +++--
 .../runtime/taskexecutor/JobLeaderService.java  | 37 +++++++++++++-------
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 14 ++++----
 5 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1fb5474..7bcfb3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -674,13 +674,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public Future<RegistrationResponse> registerTaskManager(
+			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId) throws Exception
 	{
 		if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
 			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
 							"leader session ID {} did not equal the received leader session ID {}.",
-					taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+					taskManagerLocation.getResourceID(), taskManagerRpcAddress,
 					JobMaster.this.leaderSessionID, leaderId);
 			throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID
 					+ ", actual: " + leaderId);
@@ -696,7 +697,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			return getRpcService().execute(new Callable<TaskExecutorGateway>() {
 				@Override
 				public TaskExecutorGateway call() throws Exception {
-					return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class)
+					return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
 							.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
 				}
 			}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -709,7 +710,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
 						log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
 										"leader session ID {} did not equal the received leader session ID {}.",
-								taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+								taskManagerId, taskManagerRpcAddress,
 								JobMaster.this.leaderSessionID, leaderId);
 						return new RegistrationResponse.Decline("Invalid leader session id");
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 508e70a..8925d94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -185,12 +185,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	/**
 	 * Register the task manager at the job manager.
 	 *
-	 * @param taskManagerLocation location of the task manager
-	 * @param leaderId            identifying the job leader
-	 * @param timeout             for the rpc call
+	 * @param taskManagerRpcAddress the rpc address of the task manager
+	 * @param taskManagerLocation   location of the task manager
+	 * @param leaderId              identifying the job leader
+	 * @param timeout               for the rpc call
 	 * @return Future registration response indicating whether the registration was successful or not
 	 */
 	Future<RegistrationResponse> registerTaskManager(
+			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
 			final UUID leaderId,
 			@RpcTimeout final Time timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 14d36ab..93c7bb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -62,6 +62,9 @@ public class JobLeaderService {
 	/** Internal state of the service */
 	private volatile JobLeaderService.State state;
 
+	/** Address of the owner of this service. This address is used for the job manager connection */
+	private String ownerAddress;
+
 	/** Rpc service to use for establishing connections */
 	private RpcService rpcService;
 
@@ -78,6 +81,7 @@ public class JobLeaderService {
 
 		state = JobLeaderService.State.CREATED;
 
+		ownerAddress = null;
 		rpcService = null;
 		highAvailabilityServices = null;
 		jobLeaderListener = null;
@@ -90,20 +94,23 @@ public class JobLeaderService {
 	/**
 	 * Start the job leader service with the given services.
 	 *
+ 	 * @param initialOwnerAddress to be used for establishing connections (source address)
 	 * @param initialRpcService to be used to create rpc connections
 	 * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs
 	 * @param initialJobLeaderListener listening for job leader changes
 	 */
 	public void start(
-		final RpcService initialRpcService,
-		final HighAvailabilityServices initialHighAvailabilityServices,
-		final JobLeaderListener initialJobLeaderListener) {
+			final String initialOwnerAddress,
+			final RpcService initialRpcService,
+			final HighAvailabilityServices initialHighAvailabilityServices,
+			final JobLeaderListener initialJobLeaderListener) {
 
 		if (JobLeaderService.State.CREATED != state) {
 			throw new IllegalStateException("The service has already been started.");
 		} else {
 			LOG.info("Start job leader service.");
 
+			this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
 			this.rpcService = Preconditions.checkNotNull(initialRpcService);
 			this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
 			this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
@@ -310,6 +317,7 @@ public class JobLeaderService {
 						JobMasterGateway.class,
 						getTargetAddress(),
 						getTargetLeaderId(),
+						ownerAddress,
 						ownLocation);
 			}
 
@@ -345,19 +353,23 @@ public class JobLeaderService {
 			extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess>
 	{
 
+		private final String taskManagerRpcAddress;
+
 		private final TaskManagerLocation taskManagerLocation;
 
 		JobManagerRetryingRegistration(
-			Logger log,
-			RpcService rpcService,
-			String targetName,
-			Class<JobMasterGateway> targetType,
-			String targetAddress,
-			UUID leaderId,
-			TaskManagerLocation taskManagerLocation) {
-
+				Logger log,
+				RpcService rpcService,
+				String targetName,
+				Class<JobMasterGateway> targetType,
+				String targetAddress,
+				UUID leaderId,
+				String taskManagerRpcAddress,
+				TaskManagerLocation taskManagerLocation)
+		{
 			super(log, rpcService, targetName, targetType, targetAddress, leaderId);
 
+			this.taskManagerRpcAddress = taskManagerRpcAddress;
 			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
 		}
 
@@ -365,7 +377,8 @@ public class JobLeaderService {
 		protected Future<RegistrationResponse> invokeRegistration(
 				JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
 		{
-			return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis));
+			return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
+					leaderId, Time.milliseconds(timeoutMillis));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 1b1c02b..5146e5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -208,7 +208,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		taskSlotTable.start(new SlotActionsImpl());
 
 		// start the job leader service
-		jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl());
+		jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a8da4fd..55cc142 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -394,9 +394,10 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			eq(taskManagerLocation),
-			eq(jobManagerLeaderId),
-			any(Time.class)
+				any(String.class),
+				eq(taskManagerLocation),
+				eq(jobManagerLeaderId),
+				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
 
@@ -498,9 +499,10 @@ public class TaskExecutorTest extends TestLogger {
 		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
 
 		when(jobMasterGateway.registerTaskManager(
-			eq(taskManagerLocation),
-			eq(jobManagerLeaderId),
-			any(Time.class)
+				any(String.class),
+				eq(taskManagerLocation),
+				eq(jobManagerLeaderId),
+				any(Time.class)
 		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
 		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);