You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 20:56:12 UTC
flink git commit: [FLINK-4406] [cluster management] Implement job
master registration at resource manager
Repository: flink
Updated Branches:
refs/heads/flip-6 cf1e875d7 -> 626e67276
[FLINK-4406] [cluster management] Implement job master registration at resource manager
[FLINK-4406] [cluster management] Skip new connection if new resource manager's address and leader id are both not changing
[FLINK-4406] [cluster management] Verify registration response with leader id
This closes #2565.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/626e6727
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/626e6727
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/626e6727
Branch: refs/heads/flip-6
Commit: 626e67276410bc204f2dd2fb85c6f033cfb39ffb
Parents: cf1e875
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Sep 29 08:56:27 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 22:55:40 2016 +0200
----------------------------------------------------------------------
.../runtime/jobmaster/JobManagerRunner.java | 8 +-
.../flink/runtime/jobmaster/JobMaster.java | 222 +++++++++++++++++--
.../runtime/jobmaster/JobMasterGateway.java | 17 +-
.../jobmaster/JobMasterRegistrationSuccess.java | 18 +-
.../JobMasterToResourceManagerConnection.java | 117 ----------
.../resourcemanager/ResourceManager.java | 2 +-
.../jobmaster/JobManagerRunnerMockTest.java | 25 ++-
7 files changed, 239 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/626e6727/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index bc2bf9a..6944d85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -63,9 +63,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
private final JobMaster jobManager;
- /** Leader session id when granted leadership */
- private UUID leaderSessionID;
-
/** flag marking the runner as shut down */
private volatile boolean shutdown;
@@ -93,7 +90,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
this.executionContext = rpcService.getExecutor();
this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory();
this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
- this.leaderSessionID = null;
this.jobManager = new JobMaster(
jobGraph, configuration, rpcService, haServices,
@@ -232,7 +228,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
// The operation may be blocking, but since this runner is idle before it been granted the leadership,
// it's okay that job manager wait for the operation complete
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
- this.leaderSessionID = leaderSessionID;
// Double check the leadership after we confirm that, there is a small chance that multiple
// job managers schedule the same job after if they try to recover at the same time.
@@ -242,7 +237,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
jobFinishedByOther();
} else {
- jobManager.getSelf().startJob();
+ jobManager.getSelf().startJob(leaderSessionID);
}
}
}
@@ -259,7 +254,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
log.info("JobManager for job {} ({}) was revoked leadership at {}.",
jobGraph.getName(), jobGraph.getJobID(), getAddress());
- leaderSessionID = null;
jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader."));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/626e6727/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b52a23c..1e01c55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
@@ -34,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -47,18 +49,26 @@ import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.slf4j.Logger;
+
import scala.concurrent.ExecutionContext$;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -76,9 +86,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class JobMaster extends RpcEndpoint<JobMasterGateway> {
- /** Gateway to connected resource manager, null iff not connected */
- private ResourceManagerGateway resourceManager = null;
-
/** Logical representation of the job */
private final JobGraph jobGraph;
@@ -123,6 +130,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
private MetricGroup jobMetrics;
+ private volatile UUID leaderSessionID;
+
+ // --------- resource manager --------
+
+ /** Leader retriever service used to locate ResourceManager's address */
+ private LeaderRetrievalService resourceManagerLeaderRetriever;
+
+ /** Connection with ResourceManager, null if not located address yet or we close it initiative */
+ private volatile ResourceManagerConnection resourceManagerConnection;
+
+ // ------------------------------------------------------------------------
+
public JobMaster(
JobGraph jobGraph,
Configuration configuration,
@@ -151,10 +170,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
this.jobCompletionActions = checkNotNull(jobCompletionActions);
}
- public ResourceManagerGateway getResourceManager() {
- return resourceManager;
- }
-
//----------------------------------------------------------------------------------------------
// Lifecycle management
//----------------------------------------------------------------------------------------------
@@ -196,7 +211,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
.getRestartStrategy();
if (restartStrategyConfiguration != null) {
restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
- } else {
+ }
+ else {
restartStrategy = restartStrategyFactory.createRestartStrategy();
}
@@ -216,6 +232,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e);
}
+ try {
+ resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
+ } catch (Exception e) {
+ log.error("Could not get the resource manager leader retriever.", e);
+ throw new JobSubmissionException(jobGraph.getJobID(),
+ "Could not get the resource manager leader retriever.", e);
+ }
} catch (Throwable t) {
log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
@@ -223,7 +246,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
if (t instanceof JobSubmissionException) {
throw (JobSubmissionException) t;
- } else {
+ }
+ else {
throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t);
}
@@ -240,8 +264,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
super.shutDown();
suspendJob(new Exception("JobManager is shutting down."));
+
+ disposeCommunicationWithResourceManager();
}
+
+
//----------------------------------------------------------------------------------------------
// RPC methods
//----------------------------------------------------------------------------------------------
@@ -251,8 +279,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
* being recovered. After this, we will begin to schedule the job.
*/
@RpcMethod
- public void startJob() {
- log.info("Starting job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+ public void startJob(final UUID leaderSessionID) {
+ log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+
+ this.leaderSessionID = leaderSessionID;
if (executionGraph != null) {
executionGraph = new ExecutionGraph(
@@ -267,7 +297,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
jobGraph.getClasspaths(),
userCodeLoader,
jobMetrics);
- } else {
+ }
+ else {
// TODO: update last active time in JobInfo
}
@@ -343,7 +374,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
final CheckpointStatsTracker checkpointStatsTracker;
if (isStatsDisabled) {
checkpointStatsTracker = new DisabledCheckpointStatsTracker();
- } else {
+ }
+ else {
int historySize = configuration.getInteger(
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
@@ -397,6 +429,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
*/
+ // job is good to go, try to locate resource manager's address
+ resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
} catch (Throwable t) {
log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
@@ -406,7 +440,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
final Throwable rt;
if (t instanceof JobExecutionException) {
rt = (JobExecutionException) t;
- } else {
+ }
+ else {
rt = new JobExecutionException(jobGraph.getJobID(),
"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
}
@@ -439,10 +474,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
*/
@RpcMethod
public void suspendJob(final Throwable cause) {
+ leaderSessionID = null;
+
if (executionGraph != null) {
executionGraph.suspend(cause);
executionGraph = null;
}
+
+ disposeCommunicationWithResourceManager();
}
/**
@@ -457,14 +496,90 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
return Acknowledge.get();
}
- /**
- * Triggers the registration of the job master at the resource manager.
- *
- * @param address Address of the resource manager
- */
- @RpcMethod
- public void registerAtResourceManager(final String address) {
- //TODO:: register at the RM
+ //----------------------------------------------------------------------------------------------\u2028
+ // Internal methods\u2028
+ // ----------------------------------------------------------------------------------------------\u2028\u2028
+
+ private void handleFatalError(final Throwable cause) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
+ shutDown();
+ jobCompletionActions.onFatalError(cause);
+ }
+ });
+ }
+
+ private void notifyOfNewResourceManagerLeader(
+ final String resourceManagerAddress, final UUID resourceManagerLeaderId)
+ {
+ // IMPORTANT: executed by main thread to avoid concurrence
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ if (resourceManagerConnection != null) {
+ if (resourceManagerAddress != null) {
+ if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+ && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
+ {
+ // both address and leader id are not changed, we can keep the old connection
+ return;
+ }
+ log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+ resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+ }
+ else {
+ log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+ resourceManagerConnection.getTargetAddress());
+ }
+ }
+
+ closeResourceManagerConnection();
+
+ if (resourceManagerAddress != null) {
+ log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+ resourceManagerConnection = new ResourceManagerConnection(
+ log, jobGraph.getJobID(), leaderSessionID,
+ resourceManagerAddress, resourceManagerLeaderId, executionContext);
+ resourceManagerConnection.start();
+ }
+ }
+ });
+ }
+
+ private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
+ getRpcService().execute(new Runnable() {
+ @Override
+ public void run() {
+ // TODO - add tests for comment in https://github.com/apache/flink/pull/2565
+ // verify the response with current connection
+ if (resourceManagerConnection != null
+ && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+ log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
+ success.getResourceManagerLeaderId());
+ }
+ }
+ });
+ }
+
+ private void disposeCommunicationWithResourceManager() {
+ // 1. stop the leader retriever so we will not receiving updates anymore
+ try {
+ resourceManagerLeaderRetriever.stop();
+ } catch (Exception e) {
+ log.warn("Failed to stop resource manager leader retriever.");
+ }
+
+ // 2. close current connection with ResourceManager if exists
+ closeResourceManagerConnection();
+ }
+
+ private void closeResourceManagerConnection() {
+ if (resourceManagerConnection != null) {
+ resourceManagerConnection.close();
+ resourceManagerConnection = null;
+ }
}
//----------------------------------------------------------------------------------------------
@@ -494,4 +609,67 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
}
return ret;
}
+
+ //----------------------------------------------------------------------------------------------
+ // Utility classes
+ //----------------------------------------------------------------------------------------------
+
+ private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+ @Override
+ public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+ notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+ }
+
+ @Override
+ public void handleError(final Exception exception) {
+ handleFatalError(exception);
+ }
+ }
+
+ private class ResourceManagerConnection
+ extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
+ {
+ private final JobID jobID;
+
+ private final UUID jobManagerLeaderID;
+
+ ResourceManagerConnection(
+ final Logger log,
+ final JobID jobID,
+ final UUID jobManagerLeaderID,
+ final String resourceManagerAddress,
+ final UUID resourceManagerLeaderID,
+ final Executor executor)
+ {
+ super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
+ this.jobID = checkNotNull(jobID);
+ this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID);
+ }
+
+ @Override
+ protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
+ return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>(
+ log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
+ getTargetAddress(), getTargetLeaderId())
+ {
+ @Override
+ protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
+ long timeoutMillis) throws Exception
+ {
+ Time timeout = Time.milliseconds(timeoutMillis);
+ return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
+ }
+ };
+ }
+
+ @Override
+ protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
+ onResourceManagerRegistrationSuccess(success);
+ }
+
+ @Override
+ protected void onRegistrationFailure(final Throwable failure) {
+ handleFatalError(failure);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/626e6727/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b281ea8..6587ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -23,19 +23,21 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import java.util.UUID;
+
/**
* {@link JobMaster} rpc gateway interface
*/
public interface JobMasterGateway extends RpcGateway {
/**
- * Making this job begins to run.
+ * Starting the job under the given leader session ID.
*/
- void startJob();
+ void startJob(final UUID leaderSessionID);
/**
- * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit
- * the job before restarting it.
+ * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared.
+ * Should re-submit the job before restarting it.
*
* @param cause The reason of why this job been suspended.
*/
@@ -48,11 +50,4 @@ public interface JobMasterGateway extends RpcGateway {
* @return Future acknowledge of the task execution state update
*/
Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
-
- /**
- * Triggers the registration of the job master at the resource manager.
- *
- * @param address Address of the resource manager
- */
- void registerAtResourceManager(final String address);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/626e6727/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
index 031c38e..4058452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -20,6 +20,10 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.registration.RegistrationResponse;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Base class for responses from the ResourceManager to a registration attempt by a JobMaster.
*/
@@ -29,8 +33,11 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
private final long heartbeatInterval;
- public JobMasterRegistrationSuccess(long heartbeatInterval) {
+ private final UUID resourceManagerLeaderId;
+
+ public JobMasterRegistrationSuccess(final long heartbeatInterval, final UUID resourceManagerLeaderId) {
this.heartbeatInterval = heartbeatInterval;
+ this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
}
/**
@@ -42,8 +49,15 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
return heartbeatInterval;
}
+ public UUID getResourceManagerLeaderId() {
+ return resourceManagerLeaderId;
+ }
+
@Override
public String toString() {
- return "JobMasterRegistrationSuccess(" + heartbeatInterval + ')';
+ return "JobMasterRegistrationSuccess{" +
+ "heartbeatInterval=" + heartbeatInterval +
+ ", resourceManagerLeaderId=" + resourceManagerLeaderId +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/626e6727/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
deleted file mode 100644
index 71fce8c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.registration.RegisteredRpcConnection;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.registration.RetryingRegistration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.concurrent.Future;
-
-import org.slf4j.Logger;
-
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The connection between a JobMaster and the ResourceManager.
- */
-public class JobMasterToResourceManagerConnection
- extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> {
-
- /** the JobMaster whose connection to the ResourceManager this represents */
- private final JobMaster jobMaster;
-
- private final JobID jobID;
-
- private final UUID jobMasterLeaderId;
-
- public JobMasterToResourceManagerConnection(
- Logger log,
- JobID jobID,
- JobMaster jobMaster,
- UUID jobMasterLeaderId,
- String resourceManagerAddress,
- UUID resourceManagerLeaderId,
- Executor executor) {
-
- super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
- this.jobMaster = checkNotNull(jobMaster);
- this.jobID = checkNotNull(jobID);
- this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
- }
-
- @Override
- protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
- return new JobMasterToResourceManagerConnection.ResourceManagerRegistration(
- log, jobMaster.getRpcService(),
- getTargetAddress(), getTargetLeaderId(),
- jobMaster.getAddress(),jobID, jobMasterLeaderId);
- }
-
- @Override
- protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) {
- }
-
- @Override
- protected void onRegistrationFailure(Throwable failure) {
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class ResourceManagerRegistration
- extends RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> {
-
- private final String jobMasterAddress;
-
- private final JobID jobID;
-
- private final UUID jobMasterLeaderId;
-
- ResourceManagerRegistration(
- Logger log,
- RpcService rpcService,
- String targetAddress,
- UUID leaderId,
- String jobMasterAddress,
- JobID jobID,
- UUID jobMasterLeaderId) {
-
- super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
- this.jobMasterAddress = checkNotNull(jobMasterAddress);
- this.jobID = checkNotNull(jobID);
- this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
- }
-
- @Override
- protected Future<RegistrationResponse> invokeRegistration(
- ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
-
- Time timeout = Time.milliseconds(timeoutMillis);
- return gateway.registerJobMaster(leaderId, jobMasterLeaderId,jobMasterAddress, jobID, timeout);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/626e6727/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 190a4de..f695de4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -215,7 +215,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
if (existingGateway != null) {
log.info("Replacing gateway for registered JobID {}.", jobID);
}
- return new JobMasterRegistrationSuccess(5000);
+ return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
}
}
}, getMainThreadExecutor());
http://git-wip-us.apache.org/repos/asf/flink/blob/626e6727/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index dc3b5fd..bfe5f55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -142,8 +142,9 @@ public class JobManagerRunnerMockTest {
public void testJobFinished() throws Exception {
runner.start();
- runner.grantLeadership(UUID.randomUUID());
- verify(jobManagerGateway).startJob();
+ UUID leaderSessionID = UUID.randomUUID();
+ runner.grantLeadership(leaderSessionID);
+ verify(jobManagerGateway).startJob(leaderSessionID);
assertTrue(!jobCompletion.isJobFinished());
// runner been told by JobManager that job is finished
@@ -160,8 +161,9 @@ public class JobManagerRunnerMockTest {
public void testJobFailed() throws Exception {
runner.start();
- runner.grantLeadership(UUID.randomUUID());
- verify(jobManagerGateway).startJob();
+ UUID leaderSessionID = UUID.randomUUID();
+ runner.grantLeadership(leaderSessionID);
+ verify(jobManagerGateway).startJob(leaderSessionID);
assertTrue(!jobCompletion.isJobFinished());
// runner been told by JobManager that job is failed
@@ -177,8 +179,9 @@ public class JobManagerRunnerMockTest {
public void testLeadershipRevoked() throws Exception {
runner.start();
- runner.grantLeadership(UUID.randomUUID());
- verify(jobManagerGateway).startJob();
+ UUID leaderSessionID = UUID.randomUUID();
+ runner.grantLeadership(leaderSessionID);
+ verify(jobManagerGateway).startJob(leaderSessionID);
assertTrue(!jobCompletion.isJobFinished());
runner.revokeLeadership();
@@ -190,16 +193,18 @@ public class JobManagerRunnerMockTest {
public void testRegainLeadership() throws Exception {
runner.start();
- runner.grantLeadership(UUID.randomUUID());
- verify(jobManagerGateway).startJob();
+ UUID leaderSessionID = UUID.randomUUID();
+ runner.grantLeadership(leaderSessionID);
+ verify(jobManagerGateway).startJob(leaderSessionID);
assertTrue(!jobCompletion.isJobFinished());
runner.revokeLeadership();
verify(jobManagerGateway).suspendJob(any(Throwable.class));
assertFalse(runner.isShutdown());
- runner.grantLeadership(UUID.randomUUID());
- verify(jobManagerGateway, times(2)).startJob();
+ UUID leaderSessionID2 = UUID.randomUUID();
+ runner.grantLeadership(leaderSessionID2);
+ verify(jobManagerGateway, times(2)).startJob(leaderSessionID2);
}
private static class TestingOnCompletionActions implements OnCompletionActions {