You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:54 UTC

[49/50] [abbrv] flink git commit: [FLINK-4406] [cluster management] Implement job master registration at resource manager

[FLINK-4406] [cluster management] Implement job master registration at resource manager

[FLINK-4406] [cluster management] Skip new connection if new resource manager's address and leader id are both not changing

[FLINK-4406] [cluster management] Verify registration response with leader id

This closes #2565.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba2b5909
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba2b5909
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba2b5909

Branch: refs/heads/flip-6
Commit: ba2b59096022b480a70f6410d9b1643da5158608
Parents: 009ba35
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Sep 29 08:56:27 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:46:59 2016 +0200

----------------------------------------------------------------------
 .../runtime/jobmaster/JobManagerRunner.java     |   8 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 222 +++++++++++++++++--
 .../runtime/jobmaster/JobMasterGateway.java     |  17 +-
 .../jobmaster/JobMasterRegistrationSuccess.java |  18 +-
 .../JobMasterToResourceManagerConnection.java   | 117 ----------
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |  25 ++-
 7 files changed, 239 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba2b5909/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index bc2bf9a..6944d85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -63,9 +63,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 
 	private final JobMaster jobManager;
 
-	/** Leader session id when granted leadership */
-	private UUID leaderSessionID;
-
 	/** flag marking the runner as shut down */
 	private volatile boolean shutdown;
 
@@ -93,7 +90,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 		this.executionContext = rpcService.getExecutor();
 		this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory();
 		this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
-		this.leaderSessionID = null;
 
 		this.jobManager = new JobMaster(
 			jobGraph, configuration, rpcService, haServices,
@@ -232,7 +228,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 			// The operation may be blocking, but since this runner is idle before it been granted the leadership,
 			// it's okay that job manager wait for the operation complete
 			leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-			this.leaderSessionID = leaderSessionID;
 
 			// Double check the leadership after we confirm that, there is a small chance that multiple
 			// job managers schedule the same job after if they try to recover at the same time.
@@ -242,7 +237,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 					log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
 					jobFinishedByOther();
 				} else {
-					jobManager.getSelf().startJob();
+					jobManager.getSelf().startJob(leaderSessionID);
 				}
 			}
 		}
@@ -259,7 +254,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions {
 			log.info("JobManager for job {} ({}) was revoked leadership at {}.",
 				jobGraph.getName(), jobGraph.getJobID(), getAddress());
 
-			leaderSessionID = null;
 			jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader."));
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2b5909/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b52a23c..1e01c55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
@@ -34,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -47,18 +49,26 @@ import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.slf4j.Logger;
+
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -76,9 +86,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
-	/** Gateway to connected resource manager, null iff not connected */
-	private ResourceManagerGateway resourceManager = null;
-
 	/** Logical representation of the job */
 	private final JobGraph jobGraph;
 
@@ -123,6 +130,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private MetricGroup jobMetrics;
 
+	private volatile UUID leaderSessionID;
+
+	// --------- resource manager --------
+
+	/** Leader retriever service used to locate ResourceManager's address */
+	private LeaderRetrievalService resourceManagerLeaderRetriever;
+
+	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
+	private volatile ResourceManagerConnection resourceManagerConnection;
+
+	// ------------------------------------------------------------------------
+
 	public JobMaster(
 		JobGraph jobGraph,
 		Configuration configuration,
@@ -151,10 +170,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 	}
 
-	public ResourceManagerGateway getResourceManager() {
-		return resourceManager;
-	}
-
 	//----------------------------------------------------------------------------------------------
 	// Lifecycle management
 	//----------------------------------------------------------------------------------------------
@@ -196,7 +211,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					.getRestartStrategy();
 			if (restartStrategyConfiguration != null) {
 				restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
-			} else {
+			}
+			else {
 				restartStrategy = restartStrategyFactory.createRestartStrategy();
 			}
 
@@ -216,6 +232,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e);
 			}
 
+			try {
+				resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
+			} catch (Exception e) {
+				log.error("Could not get the resource manager leader retriever.", e);
+				throw new JobSubmissionException(jobGraph.getJobID(),
+					"Could not get the resource manager leader retriever.", e);
+			}
 		} catch (Throwable t) {
 			log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
 
@@ -223,7 +246,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 			if (t instanceof JobSubmissionException) {
 				throw (JobSubmissionException) t;
-			} else {
+			}
+			else {
 				throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
 					jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t);
 			}
@@ -240,8 +264,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		super.shutDown();
 
 		suspendJob(new Exception("JobManager is shutting down."));
+
+		disposeCommunicationWithResourceManager();
 	}
 
+
+
 	//----------------------------------------------------------------------------------------------
 	// RPC methods
 	//----------------------------------------------------------------------------------------------
@@ -251,8 +279,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * being recovered. After this, we will begin to schedule the job.
 	 */
 	@RpcMethod
-	public void startJob() {
-		log.info("Starting job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+	public void startJob(final UUID leaderSessionID) {
+		log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+
+		this.leaderSessionID = leaderSessionID;
 
 		if (executionGraph != null) {
 			executionGraph = new ExecutionGraph(
@@ -267,7 +297,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				jobGraph.getClasspaths(),
 				userCodeLoader,
 				jobMetrics);
-		} else {
+		}
+		else {
 			// TODO: update last active time in JobInfo
 		}
 
@@ -343,7 +374,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				final CheckpointStatsTracker checkpointStatsTracker;
 				if (isStatsDisabled) {
 					checkpointStatsTracker = new DisabledCheckpointStatsTracker();
-				} else {
+				}
+				else {
 					int historySize = configuration.getInteger(
 						ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 						ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
@@ -397,6 +429,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			}
 			*/
 
+			// job is good to go, try to locate resource manager's address
+			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 		} catch (Throwable t) {
 			log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
 
@@ -406,7 +440,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			final Throwable rt;
 			if (t instanceof JobExecutionException) {
 				rt = (JobExecutionException) t;
-			} else {
+			}
+			else {
 				rt = new JobExecutionException(jobGraph.getJobID(),
 					"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
 			}
@@ -439,10 +474,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 */
 	@RpcMethod
 	public void suspendJob(final Throwable cause) {
+		leaderSessionID = null;
+
 		if (executionGraph != null) {
 			executionGraph.suspend(cause);
 			executionGraph = null;
 		}
+
+		disposeCommunicationWithResourceManager();
 	}
 
 	/**
@@ -457,14 +496,90 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		return Acknowledge.get();
 	}
 
-	/**
-	 * Triggers the registration of the job master at the resource manager.
-	 *
-	 * @param address Address of the resource manager
-	 */
-	@RpcMethod
-	public void registerAtResourceManager(final String address) {
-		//TODO:: register at the RM
+	//----------------------------------------------------------------------------------------------\u2028
+	// Internal methods\u2028
+	// ----------------------------------------------------------------------------------------------\u2028\u2028
+
+	private void handleFatalError(final Throwable cause) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
+				shutDown();
+				jobCompletionActions.onFatalError(cause);
+			}
+		});
+	}
+
+	private void notifyOfNewResourceManagerLeader(
+		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
+	{
+		// IMPORTANT: executed by main thread to avoid concurrence
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				if (resourceManagerConnection != null) {
+					if (resourceManagerAddress != null) {
+						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+							&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
+						{
+							// both address and leader id are not changed, we can keep the old connection
+							return;
+						}
+						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+							resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+					}
+					else {
+						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+							resourceManagerConnection.getTargetAddress());
+					}
+				}
+
+				closeResourceManagerConnection();
+
+				if (resourceManagerAddress != null) {
+					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+					resourceManagerConnection = new ResourceManagerConnection(
+						log, jobGraph.getJobID(), leaderSessionID,
+						resourceManagerAddress, resourceManagerLeaderId, executionContext);
+					resourceManagerConnection.start();
+				}
+			}
+		});
+	}
+
+	private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
+		getRpcService().execute(new Runnable() {
+			@Override
+			public void run() {
+				// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
+				// verify the response with current connection
+				if (resourceManagerConnection != null
+					&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+					log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
+						success.getResourceManagerLeaderId());
+				}
+			}
+		});
+	}
+
+	private void disposeCommunicationWithResourceManager() {
+		// 1. stop the leader retriever so we will not receiving updates anymore
+		try {
+			resourceManagerLeaderRetriever.stop();
+		} catch (Exception e) {
+			log.warn("Failed to stop resource manager leader retriever.");
+		}
+
+		// 2. close current connection with ResourceManager if exists
+		closeResourceManagerConnection();
+	}
+
+	private void closeResourceManagerConnection() {
+		if (resourceManagerConnection != null) {
+			resourceManagerConnection.close();
+			resourceManagerConnection = null;
+		}
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -494,4 +609,67 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 		return ret;
 	}
+
+	//----------------------------------------------------------------------------------------------
+	// Utility classes
+	//----------------------------------------------------------------------------------------------
+
+	private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+		@Override
+		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
+			notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+		}
+
+		@Override
+		public void handleError(final Exception exception) {
+			handleFatalError(exception);
+		}
+	}
+
+	private class ResourceManagerConnection
+		extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
+	{
+		private final JobID jobID;
+
+		private final UUID jobManagerLeaderID;
+
+		ResourceManagerConnection(
+			final Logger log,
+			final JobID jobID,
+			final UUID jobManagerLeaderID,
+			final String resourceManagerAddress,
+			final UUID resourceManagerLeaderID,
+			final Executor executor)
+		{
+			super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
+			this.jobID = checkNotNull(jobID);
+			this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID);
+		}
+
+		@Override
+		protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
+			return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>(
+				log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
+				getTargetAddress(), getTargetLeaderId())
+			{
+				@Override
+				protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
+					long timeoutMillis) throws Exception
+				{
+					Time timeout = Time.milliseconds(timeoutMillis);
+					return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
+				}
+			};
+		}
+
+		@Override
+		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
+			onResourceManagerRegistrationSuccess(success);
+		}
+
+		@Override
+		protected void onRegistrationFailure(final Throwable failure) {
+			handleFatalError(failure);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2b5909/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b281ea8..6587ccb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -23,19 +23,21 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
+import java.util.UUID;
+
 /**
  * {@link JobMaster} rpc gateway interface
  */
 public interface JobMasterGateway extends RpcGateway {
 
 	/**
-	 * Making this job begins to run.
+	 * Starting the job under the given leader session ID.
 	 */
-	void startJob();
+	void startJob(final UUID leaderSessionID);
 
 	/**
-	 * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit
-	 * the job before restarting it.
+	 * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared.
+	 * Should re-submit the job before restarting it.
 	 *
 	 * @param cause The reason of why this job been suspended.
 	 */
@@ -48,11 +50,4 @@ public interface JobMasterGateway extends RpcGateway {
 	 * @return Future acknowledge of the task execution state update
 	 */
 	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
-
-	/**
-	 * Triggers the registration of the job master at the resource manager.
-	 *
-	 * @param address Address of the resource manager
-	 */
-	void registerAtResourceManager(final String address);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2b5909/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
index 031c38e..4058452 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -20,6 +20,10 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Base class for responses from the ResourceManager to a registration attempt by a JobMaster.
  */
@@ -29,8 +33,11 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
 
 	private final long heartbeatInterval;
 
-	public JobMasterRegistrationSuccess(long heartbeatInterval) {
+	private final UUID resourceManagerLeaderId;
+
+	public JobMasterRegistrationSuccess(final long heartbeatInterval, final UUID resourceManagerLeaderId) {
 		this.heartbeatInterval = heartbeatInterval;
+		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
 	}
 
 	/**
@@ -42,8 +49,15 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success {
 		return heartbeatInterval;
 	}
 
+	public UUID getResourceManagerLeaderId() {
+		return resourceManagerLeaderId;
+	}
+
 	@Override
 	public String toString() {
-		return "JobMasterRegistrationSuccess(" + heartbeatInterval + ')';
+		return "JobMasterRegistrationSuccess{" +
+			"heartbeatInterval=" + heartbeatInterval +
+			", resourceManagerLeaderId=" + resourceManagerLeaderId +
+			'}';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2b5909/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
deleted file mode 100644
index 71fce8c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.registration.RegisteredRpcConnection;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.registration.RetryingRegistration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.concurrent.Future;
-
-import org.slf4j.Logger;
-
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The connection between a JobMaster and the ResourceManager.
- */
-public class JobMasterToResourceManagerConnection 
-		extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> {
-
-	/** the JobMaster whose connection to the ResourceManager this represents */
-	private final JobMaster jobMaster;
-
-	private final JobID jobID;
-
-	private final UUID jobMasterLeaderId;
-
-	public JobMasterToResourceManagerConnection(
-			Logger log,
-			JobID jobID,
-			JobMaster jobMaster,
-			UUID jobMasterLeaderId,
-			String resourceManagerAddress,
-			UUID resourceManagerLeaderId,
-			Executor executor) {
-
-		super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
-		this.jobMaster = checkNotNull(jobMaster);
-		this.jobID = checkNotNull(jobID);
-		this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
-	}
-
-	@Override
-	protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
-		return new JobMasterToResourceManagerConnection.ResourceManagerRegistration(
-			log, jobMaster.getRpcService(),
-			getTargetAddress(), getTargetLeaderId(),
-			jobMaster.getAddress(),jobID, jobMasterLeaderId);
-	}
-
-	@Override
-	protected void onRegistrationSuccess(JobMasterRegistrationSuccess success) {
-	}
-
-	@Override
-	protected void onRegistrationFailure(Throwable failure) {
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static class ResourceManagerRegistration
-		extends RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> {
-
-		private final String jobMasterAddress;
-
-		private final JobID jobID;
-
-		private final UUID jobMasterLeaderId;
-
-		ResourceManagerRegistration(
-			Logger log,
-			RpcService rpcService,
-			String targetAddress,
-			UUID leaderId,
-			String jobMasterAddress,
-			JobID jobID,
-			UUID jobMasterLeaderId) {
-
-			super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
-			this.jobMasterAddress = checkNotNull(jobMasterAddress);
-			this.jobID = checkNotNull(jobID);
-			this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
-		}
-
-		@Override
-		protected Future<RegistrationResponse> invokeRegistration(
-			ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception {
-
-			Time timeout = Time.milliseconds(timeoutMillis);
-			return gateway.registerJobMaster(leaderId, jobMasterLeaderId,jobMasterAddress, jobID, timeout);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2b5909/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 190a4de..f695de4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -215,7 +215,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 						if (existingGateway != null) {
 							log.info("Replacing gateway for registered JobID {}.", jobID);
 						}
-						return new JobMasterRegistrationSuccess(5000);
+						return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
 					}
 				}
 			}, getMainThreadExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/ba2b5909/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index dc3b5fd..bfe5f55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -142,8 +142,9 @@ public class JobManagerRunnerMockTest {
 	public void testJobFinished() throws Exception {
 		runner.start();
 
-		runner.grantLeadership(UUID.randomUUID());
-		verify(jobManagerGateway).startJob();
+		UUID leaderSessionID = UUID.randomUUID();
+		runner.grantLeadership(leaderSessionID);
+		verify(jobManagerGateway).startJob(leaderSessionID);
 		assertTrue(!jobCompletion.isJobFinished());
 
 		// runner been told by JobManager that job is finished
@@ -160,8 +161,9 @@ public class JobManagerRunnerMockTest {
 	public void testJobFailed() throws Exception {
 		runner.start();
 
-		runner.grantLeadership(UUID.randomUUID());
-		verify(jobManagerGateway).startJob();
+		UUID leaderSessionID = UUID.randomUUID();
+		runner.grantLeadership(leaderSessionID);
+		verify(jobManagerGateway).startJob(leaderSessionID);
 		assertTrue(!jobCompletion.isJobFinished());
 
 		// runner been told by JobManager that job is failed
@@ -177,8 +179,9 @@ public class JobManagerRunnerMockTest {
 	public void testLeadershipRevoked() throws Exception {
 		runner.start();
 
-		runner.grantLeadership(UUID.randomUUID());
-		verify(jobManagerGateway).startJob();
+		UUID leaderSessionID = UUID.randomUUID();
+		runner.grantLeadership(leaderSessionID);
+		verify(jobManagerGateway).startJob(leaderSessionID);
 		assertTrue(!jobCompletion.isJobFinished());
 
 		runner.revokeLeadership();
@@ -190,16 +193,18 @@ public class JobManagerRunnerMockTest {
 	public void testRegainLeadership() throws Exception {
 		runner.start();
 
-		runner.grantLeadership(UUID.randomUUID());
-		verify(jobManagerGateway).startJob();
+		UUID leaderSessionID = UUID.randomUUID();
+		runner.grantLeadership(leaderSessionID);
+		verify(jobManagerGateway).startJob(leaderSessionID);
 		assertTrue(!jobCompletion.isJobFinished());
 
 		runner.revokeLeadership();
 		verify(jobManagerGateway).suspendJob(any(Throwable.class));
 		assertFalse(runner.isShutdown());
 
-		runner.grantLeadership(UUID.randomUUID());
-		verify(jobManagerGateway, times(2)).startJob();
+		UUID leaderSessionID2 = UUID.randomUUID();
+		runner.grantLeadership(leaderSessionID2);
+		verify(jobManagerGateway, times(2)).startJob(leaderSessionID2);
 	}
 
 	private static class TestingOnCompletionActions implements OnCompletionActions {