You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:30 UTC
[11/52] [abbrv] flink git commit: [FLINK-4351] [cluster management]
JobManager handle TaskManager's registration
[FLINK-4351] [cluster management] JobManager handle TaskManager's registration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc682361
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc682361
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc682361
Branch: refs/heads/master
Commit: bc6823618a1cf01a3385478bb1913940d9f604bc
Parents: 5cbec02
Author: Kurt Young <yk...@gmail.com>
Authored: Sun Oct 16 23:00:57 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobMaster.java | 75 +++++++++++++++++---
.../runtime/jobmaster/JobMasterGateway.java | 15 ++--
.../runtime/taskexecutor/JobLeaderService.java | 55 ++++++--------
.../runtime/taskexecutor/TaskExecutor.java | 2 +-
.../taskexecutor/TaskManagerServices.java | 2 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 10 ++-
6 files changed, 102 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e6720fd..1fb5474 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -36,7 +37,9 @@ import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
@@ -89,8 +94,10 @@ import org.slf4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -122,6 +129,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Configuration of the JobManager */
private final Configuration configuration;
+ private final Time rpcTimeout;
+
/** Service to contend for and retrieve the leadership of JM and RM */
private final HighAvailabilityServices highAvailabilityServices;
@@ -152,7 +161,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private volatile UUID leaderSessionID;
- // --------- resource manager --------
+ // --------- ResourceManager --------
/** Leader retriever service used to locate ResourceManager's address */
private LeaderRetrievalService resourceManagerLeaderRetriever;
@@ -160,6 +169,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
/** Connection with ResourceManager, null if not located address yet or we close it initiative */
private ResourceManagerConnection resourceManagerConnection;
+ // --------- TaskManagers --------
+
+ private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
// ------------------------------------------------------------------------
@@ -181,6 +193,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
this.jobGraph = checkNotNull(jobGraph);
this.configuration = checkNotNull(configuration);
+ this.rpcTimeout = rpcAskTimeout;
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.executionContext = checkNotNull(executorService);
@@ -244,6 +257,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
this.slotPool = new SlotPool(executorService);
this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+
+ this.registeredTaskManagers = new HashMap<>(4);
}
//----------------------------------------------------------------------------------------------
@@ -380,8 +395,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
closeResourceManagerConnection();
- // TODO: disconnect from all registered task managers
-
+ for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
+ slotPool.releaseResource(taskManagerId);
+ }
+ registeredTaskManagers.clear();
}
//----------------------------------------------------------------------------------------------
@@ -656,11 +673,53 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
@RpcMethod
- public RegistrationResponse registerTaskManager(
- final String taskManagerAddress,
- final ResourceID taskManagerProcessId,
- final UUID leaderId) {
- throw new UnsupportedOperationException("Has to be implemented.");
+ public Future<RegistrationResponse> registerTaskManager(
+ final TaskManagerLocation taskManagerLocation,
+ final UUID leaderId) throws Exception
+ {
+ if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+ log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+ "leader session ID {} did not equal the received leader session ID {}.",
+ taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+ JobMaster.this.leaderSessionID, leaderId);
+ throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID
+ + ", actual: " + leaderId);
+ }
+
+ final ResourceID taskManagerId = taskManagerLocation.getResourceID();
+
+ if (registeredTaskManagers.containsKey(taskManagerId)) {
+ final RegistrationResponse response = new JMTMRegistrationSuccess(
+ taskManagerId, libraryCacheManager.getBlobServerPort());
+ return FlinkCompletableFuture.completed(response);
+ } else {
+ return getRpcService().execute(new Callable<TaskExecutorGateway>() {
+ @Override
+ public TaskExecutorGateway call() throws Exception {
+ return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class)
+ .get(rpcTimeout.getSize(), rpcTimeout.getUnit());
+ }
+ }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
+ @Override
+ public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
+ if (throwable != null) {
+ return new RegistrationResponse.Decline(throwable.getMessage());
+ }
+
+ if (!JobMaster.this.leaderSessionID.equals(leaderId)) {
+ log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
+ "leader session ID {} did not equal the received leader session ID {}.",
+ taskManagerLocation.getResourceID(), taskManagerLocation.addressString(),
+ JobMaster.this.leaderSessionID, leaderId);
+ return new RegistrationResponse.Decline("Invalid leader session id");
+ }
+
+ slotPool.registerResource(taskManagerId);
+ registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
+ return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
+ }
+ }, getMainThreadExecutor());
+ }
}
//----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 17b4194..508e70a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import java.util.UUID;
@@ -184,15 +185,13 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
/**
* Register the task manager at the job manager.
*
- * @param taskManagerAddress address of the task manager
- * @param taskManagerProcessId identifying the task manager
- * @param leaderId identifying the job leader
- * @param timeout for the rpc call
+ * @param taskManagerLocation location of the task manager
+ * @param leaderId identifying the job leader
+ * @param timeout for the rpc call
* @return Future registration response indicating whether the registration was successful or not
*/
Future<RegistrationResponse> registerTaskManager(
- final String taskManagerAddress,
- final ResourceID taskManagerProcessId,
- final UUID leaderId,
- @RpcTimeout final Time timeout);
+ final TaskManagerLocation taskManagerLocation,
+ final UUID leaderId,
+ @RpcTimeout final Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index e7f52e2..14d36ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +53,8 @@ public class JobLeaderService {
private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
- /** Process id of the owning process */
- private final ResourceID ownerProcessId;
+ /** Self's location, used for the job manager connection */
+ private final TaskManagerLocation ownLocation;
/** The leader retrieval service and listener for each registered job */
private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
@@ -62,9 +62,6 @@ public class JobLeaderService {
/** Internal state of the service */
private volatile JobLeaderService.State state;
- /** Address of the owner of this service. This address is used for the job manager connection */
- private String ownerAddress;
-
/** Rpc service to use for establishing connections */
private RpcService rpcService;
@@ -74,14 +71,13 @@ public class JobLeaderService {
/** Job leader listener listening for job leader changes */
private JobLeaderListener jobLeaderListener;
- public JobLeaderService(ResourceID ownerProcessId) {
- this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId);
+ public JobLeaderService(TaskManagerLocation location) {
+ this.ownLocation = Preconditions.checkNotNull(location);
jobLeaderServices = new HashMap<>(4);
state = JobLeaderService.State.CREATED;
- ownerAddress = null;
rpcService = null;
highAvailabilityServices = null;
jobLeaderListener = null;
@@ -94,13 +90,11 @@ public class JobLeaderService {
/**
* Start the job leader service with the given services.
*
- * @param initialOwnerAddress to be used for establishing connections (source address)
* @param initialRpcService to be used to create rpc connections
* @param initialHighAvailabilityServices to create leader retrieval services for the different jobs
* @param initialJobLeaderListener listening for job leader changes
*/
public void start(
- final String initialOwnerAddress,
final RpcService initialRpcService,
final HighAvailabilityServices initialHighAvailabilityServices,
final JobLeaderListener initialJobLeaderListener) {
@@ -110,7 +104,6 @@ public class JobLeaderService {
} else {
LOG.info("Start job leader service.");
- this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress);
this.rpcService = Preconditions.checkNotNull(initialRpcService);
this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices);
this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener);
@@ -311,14 +304,13 @@ public class JobLeaderService {
@Override
protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
return new JobLeaderService.JobManagerRetryingRegistration(
- LOG,
- rpcService,
- "JobManager",
- JobMasterGateway.class,
- getTargetAddress(),
- getTargetLeaderId(),
- ownerAddress,
- ownerProcessId);
+ LOG,
+ rpcService,
+ "JobManager",
+ JobMasterGateway.class,
+ getTargetAddress(),
+ getTargetLeaderId(),
+ ownLocation);
}
@Override
@@ -349,10 +341,11 @@ public class JobLeaderService {
/**
* Retrying registration for the job manager <--> task manager connection.
*/
- private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
+ private static final class JobManagerRetryingRegistration
+ extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess>
+ {
- private final String taskManagerAddress;
- private final ResourceID taskManagerProcessId;
+ private final TaskManagerLocation taskManagerLocation;
JobManagerRetryingRegistration(
Logger log,
@@ -361,22 +354,18 @@ public class JobLeaderService {
Class<JobMasterGateway> targetType,
String targetAddress,
UUID leaderId,
- String taskManagerAddress,
- ResourceID taskManagerProcessId) {
+ TaskManagerLocation taskManagerLocation) {
super(log, rpcService, targetName, targetType, targetAddress, leaderId);
- this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
- this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId);
+ this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
}
@Override
- protected Future<RegistrationResponse> invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
- return gateway.registerTaskManager(
- taskManagerAddress,
- taskManagerProcessId,
- leaderId,
- Time.milliseconds(timeoutMillis));
+ protected Future<RegistrationResponse> invokeRegistration(
+ JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
+ {
+ return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5146e5b..1b1c02b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -208,7 +208,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
taskSlotTable.start(new SlotActionsImpl());
// start the job leader service
- jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
+ jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 54f2332..b57fafe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -210,7 +210,7 @@ public class TaskManagerServices {
final JobManagerTable jobManagerTable = new JobManagerTable();
- final JobLeaderService jobLeaderService = new JobLeaderService(resourceID);
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
return new TaskManagerServices(
taskManagerLocation,
http://git-wip-us.apache.org/repos/asf/flink/blob/bc682361/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 8d6cba2..a8da4fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -365,7 +365,7 @@ public class TaskExecutorTest extends TestLogger {
final TimerService<AllocationID> timerService = mock(TimerService.class);
final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
- final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
@@ -394,8 +394,7 @@ public class TaskExecutorTest extends TestLogger {
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.registerTaskManager(
- any(String.class),
- eq(resourceId),
+ eq(taskManagerLocation),
eq(jobManagerLeaderId),
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
@@ -466,7 +465,7 @@ public class TaskExecutorTest extends TestLogger {
final TimerService<AllocationID> timerService = mock(TimerService.class);
final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
- final JobLeaderService jobLeaderService = new JobLeaderService(resourceId);
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final String resourceManagerAddress = "rm";
@@ -499,8 +498,7 @@ public class TaskExecutorTest extends TestLogger {
final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
when(jobMasterGateway.registerTaskManager(
- any(String.class),
- eq(resourceId),
+ eq(taskManagerLocation),
eq(jobManagerLeaderId),
any(Time.class)
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));