You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:29 UTC
[10/52] [abbrv] flink git commit: [hotfix] Treat taskManager's rpc
address and location separately
[hotfix] Treat taskManager's rpc address and location separately
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7ed9a5e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7ed9a5e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7ed9a5e
Branch: refs/heads/master
Commit: a7ed9a5e3876c538deae147217c5443a287e98d5
Parents: bc68236
Author: Kurt Young <yk...@gmail.com>
Authored: Mon Oct 17 09:38:46 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobMaster.java | 7 ++--
.../runtime/jobmaster/JobMasterGateway.java | 8 +++--
.../runtime/taskexecutor/JobLeaderService.java | 37 +++++++++++++-------
.../runtime/taskexecutor/TaskExecutor.java | 2 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 14 ++++----
5 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1fb5474..7bcfb3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -674,13 +674,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@RpcMethod
public Future<RegistrationResponse> registerTaskManager(
+ final String taskManagerRpcAddress,
final TaskManagerLocation taskManagerLocation,
final UUID leaderId) throws Exception
{
if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
"leader session ID {} did not equal the received leader session ID {}.",
- taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+ taskManagerLocation.getResourceID(), taskManagerRpcAddress,
JobMaster.this.leaderSessionID, leaderId);
throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID
+ ", actual: " + leaderId);
@@ -696,7 +697,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
return getRpcService().execute(new Callable<TaskExecutorGateway>() {
@Override
public TaskExecutorGateway call() throws Exception {
- return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class)
+ return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
}
}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -709,7 +710,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
"leader session ID {} did not equal the received leader session ID {}.",
- taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+ taskManagerId, taskManagerRpcAddress,
JobMaster.this.leaderSessionID, leaderId);
return new RegistrationResponse.Decline("Invalid leader session id");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 508e70a..8925d94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -185,12 +185,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
/**
* Register the task manager at the job manager.
*
- * @param taskManagerLocation location of the task manager
- * @param leaderId identifying the job leader
- * @param timeout for the rpc call
+ * @param taskManagerRpcAddress the rpc address of the task manager
+ * @param taskManagerLocation location of the task manager
+ * @param leaderId identifying the job leader
+ * @param timeout for the rpc call
* @return Future registration response indicating whether the registration was successful or not
*/
Future<RegistrationResponse> registerTaskManager(
+ final String taskManagerRpcAddress,
final TaskManagerLocation taskManagerLocation,
final UUID leaderId,
@RpcTimeout final Time timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 14d36ab..93c7bb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -62,6 +62,9 @@ public class JobLeaderService {
/** Internal state of the service */
private volatile JobLeaderService.State state;
+ /** Address of the owner of this service. This address is used for the job manager connection */
+ private String ownerAddress;
+
/** Rpc service to use for establishing connections */
private RpcService rpcService;
@@ -78,6 +81,7 @@ public class JobLeaderService {
state = JobLeaderService.State.CREATED;
+ ownerAddress = null;
rpcService = null;
highAvailabilityServices = null;
jobLeaderListener = null;
@@ -90,20 +94,23 @@ public class JobLeaderService {
/**
* Start the job leader service with the given services.
*
+ * @param initialOwnerAddress to be used for establishing connections (source address)
* @param initialRpcService to be used to create rpc connections
* @param initialHighAvailabilityServices to create leader retrieval services for the different jobs
* @param initialJobLeaderListener listening for job leader changes
*/
public void start(
- final RpcService initialRpcService,
- final HighAvailabilityServices initialHighAvailabilityServices,
- final JobLeaderListener initialJobLeaderListener) {
+ final String initialOwnerAddress,
+ final RpcService initialRpcService,
+ final HighAvailabilityServices initialHighAvailabilityServices,
+ final JobLeaderListener initialJobLeaderListener) {
if (JobLeaderService.State.CREATED != state) {
throw new IllegalStateException("The service has already been started.");
} else {
LOG.info("Start job leader service.");
+ this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
this.rpcService = Preconditions.checkNotNull(initialRpcService);
this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
@@ -310,6 +317,7 @@ public class JobLeaderService {
JobMasterGateway.class,
getTargetAddress(),
getTargetLeaderId(),
+ ownerAddress,
ownLocation);
}
@@ -345,19 +353,23 @@ public class JobLeaderService {
extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess>
{
+ private final String taskManagerRpcAddress;
+
private final TaskManagerLocation taskManagerLocation;
JobManagerRetryingRegistration(
- Logger log,
- RpcService rpcService,
- String targetName,
- Class<JobMasterGateway> targetType,
- String targetAddress,
- UUID leaderId,
- TaskManagerLocation taskManagerLocation) {
-
+ Logger log,
+ RpcService rpcService,
+ String targetName,
+ Class<JobMasterGateway> targetType,
+ String targetAddress,
+ UUID leaderId,
+ String taskManagerRpcAddress,
+ TaskManagerLocation taskManagerLocation)
+ {
super(log, rpcService, targetName, targetType, targetAddress, leaderId);
+ this.taskManagerRpcAddress = taskManagerRpcAddress;
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
}
@@ -365,7 +377,8 @@ public class JobLeaderService {
protected Future<RegistrationResponse> invokeRegistration(
JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
{
- return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis));
+ return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
+ leaderId, Time.milliseconds(timeoutMillis));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 1b1c02b..5146e5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -208,7 +208,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
taskSlotTable.start(new SlotActionsImpl());
// start the job leader service
- jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl());
+ jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a7ed9a5e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index a8da4fd..55cc142 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -394,9 +394,10 @@ public class TaskExecutorTest extends TestLogger {
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.registerTaskManager(
- eq(taskManagerLocation),
- eq(jobManagerLeaderId),
- any(Time.class)
+ any(String.class),
+ eq(taskManagerLocation),
+ eq(jobManagerLeaderId),
+ any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
@@ -498,9 +499,10 @@ public class TaskExecutorTest extends TestLogger {
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.registerTaskManager(
- eq(taskManagerLocation),
- eq(jobManagerLeaderId),
- any(Time.class)
+ any(String.class),
+ eq(taskManagerLocation),
+ eq(jobManagerLeaderId),
+ any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);