You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:45 UTC

[26/52] [abbrv] flink git commit: [FLINK-4986] Improvements to the JobMaster

[FLINK-4986] Improvements to the JobMaster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8730e200
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8730e200
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8730e200

Branch: refs/heads/master
Commit: 8730e200864427dd5d6ddb9f841978d68ab452bd
Parents: 91f1d09
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 17 20:26:58 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:25 2016 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  15 +
 .../flink/runtime/jobmaster/JobMaster.java      | 336 ++++++++++---------
 2 files changed, 184 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cbb4c7e..2025fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -621,6 +621,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	/**
+	 * Gets the accumulator results.
+	 */
+	public Map<String, Object> getAccumulators() throws IOException {
+
+		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
+
+		Map<String, Object> result = new HashMap<>();
+		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
+			result.put(entry.getKey(), entry.getValue().getLocalValue());
+		}
+
+		return result;
+	}
+
+	/**
 	 * Gets a serialized accumulator map.
 	 * @return The accumulator map with serialized accumulator values.
 	 * @throws IOException

http://git-wip-us.apache.org/repos/asf/flink/blob/8730e200/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 458bf0c..0b3b68e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotDescriptor;
 import org.apache.flink.runtime.instance.SlotPool;
@@ -90,7 +91,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -246,6 +246,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				-1,
 				log);
 
+		// register self as job status change listener
+		executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
+
 		this.slotPool = new SlotPool(executorService);
 		this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
 
@@ -269,13 +272,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 */
 	public void start(final UUID leaderSessionID) throws Exception {
 		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
-			super.start();
 
+			// make sure the slot pool now accepts messages for this leader  
 			slotPool.setJobManagerLeaderId(leaderSessionID);
-			log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
+
+			// make sure we receive RPC and async calls
+			super.start();
+
+			log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID());
 			getSelf().startJobExecution();
-		} else {
-			log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID);
+		}
+		else {
+			log.warn("Job already started with leader ID {}, ignoring this start request.", leaderSessionID);
 		}
 	}
 
@@ -297,48 +305,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void startJobExecution() {
-		log.info("Starting execution of job {} ({}) with leaderId {}.",
-				jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+		log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 
 		try {
-			// register self as job status change listener
-			executionGraph.registerJobStatusListener(new JobStatusListener() {
-				@Override
-				public void jobStatusChanges(
-						final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error)
-				{
-					// run in rpc thread to avoid concurrency
-					runAsync(new Runnable() {
-						@Override
-						public void run() {
-							jobStatusChanged(newJobStatus, timestamp, error);
-						}
-					});
-				}
-			});
-
 			// job is ready to go, try to establish connection with resource manager
+			//   - activate leader retrieval for the resource manager
+			//   - on notification of the leader, the connection will be established and
+			//     the slot pool will start requesting slots
 			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-		} catch (Throwable t) {
-
-			// TODO - this should not result in a job failure, but another leader should take over
-			// TODO - either this master should retry the execution, or it should relinquish leadership / terminate
-
+		}
+		catch (Throwable t) {
 			log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
 
-			executionGraph.fail(t);
-
-			final JobExecutionException rt;
-			if (t instanceof JobExecutionException) {
-				rt = (JobExecutionException) t;
-			} else {
-				rt = new JobExecutionException(jobGraph.getJobID(),
-						"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
-			}
-
-			// TODO: notify client about this failure
+			handleFatalError(new Exception(
+					"Could not start job execution: Failed to start leader service for Resource Manager", t));
 
-			jobCompletionActions.jobFailed(rt);
 			return;
 		}
 
@@ -348,7 +329,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			public void run() {
 				try {
 					executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
-				} catch (Throwable t) {
+				}
+				catch (Throwable t) {
 					executionGraph.fail(t);
 				}
 			}
@@ -386,6 +368,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 		closeResourceManagerConnection();
 
+		// TODO: in the future, the slot pool should not release the resources, so that
+		// TODO: the TaskManagers offer the resources to the new leader 
 		for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
 			slotPool.releaseResource(taskManagerId);
 		}
@@ -405,14 +389,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			final UUID leaderSessionID,
 			final TaskExecutionState taskExecutionState) throws Exception
 	{
-		if (taskExecutionState == null) {
-			throw new NullPointerException("TaskExecutionState must not be null.");
-		}
-
-		if (!this.leaderSessionID.equals(leaderSessionID)) {
-			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
-					+ ", actual: " + leaderSessionID);
-		}
+		checkNotNull(taskExecutionState, "taskExecutionState");
+		validateLeaderSessionId(leaderSessionID);
 
 		if (executionGraph.updateState(taskExecutionState)) {
 			return Acknowledge.get();
@@ -428,10 +406,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			final JobVertexID vertexID,
 			final ExecutionAttemptID executionAttempt) throws Exception
 	{
-		if (!this.leaderSessionID.equals(leaderSessionID)) {
-			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
-					+ ", actual: " + leaderSessionID);
-		}
+		validateLeaderSessionId(leaderSessionID);
 
 		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
 		if (execution == null) {
@@ -477,16 +452,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 	}
 
-	@RpcMethod
 	public ExecutionState requestPartitionState(
 			final UUID leaderSessionID,
 			final IntermediateDataSetID intermediateResultId,
 			final ResultPartitionID resultPartitionId) throws Exception {
 
-		if (!this.leaderSessionID.equals(leaderSessionID)) {
-			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
-					+ ", actual: " + leaderSessionID);
-		}
+		validateLeaderSessionId(leaderSessionID);
 
 		final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
 		if (execution != null) {
@@ -520,10 +491,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			final UUID leaderSessionID,
 			final ResultPartitionID partitionID) throws Exception
 	{
-		if (!this.leaderSessionID.equals(leaderSessionID)) {
-			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
-					+ ", actual: " + leaderSessionID);
-		}
+		validateLeaderSessionId(leaderSessionID);
 
 		executionGraph.scheduleOrUpdateConsumers(partitionID);
 		return Acknowledge.get();
@@ -534,6 +502,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		throw new UnsupportedOperationException();
 	}
 
+	// TODO: This method needs a leader session ID
 	@RpcMethod
 	public void acknowledgeCheckpoint(
 			final JobID jobID,
@@ -562,6 +531,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 	}
 
+	// TODO: This method needs a leader session ID
 	@RpcMethod
 	public void declineCheckpoint(
 			final JobID jobID,
@@ -657,10 +627,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
 			final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception
 	{
-		if (!this.leaderSessionID.equals(leaderId)) {
-			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
-					+ ", actual: " + leaderId);
-		}
+		validateLeaderSessionId(leaderSessionID);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
 		if (taskManager == null) {
@@ -689,10 +656,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			final UUID leaderId,
 			final Exception cause) throws Exception
 	{
-		if (!this.leaderSessionID.equals(leaderId)) {
-			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
-					+ ", actual: " + leaderId);
-		}
+		validateLeaderSessionId(leaderSessionID);
 
 		if (!registeredTaskManagers.containsKey(taskManagerId)) {
 			throw new Exception("Unknown TaskManager " + taskManagerId);
@@ -782,62 +746,55 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
+		validateRunsInMainThread();
+
 		final JobID jobID = executionGraph.getJobID();
 		final String jobName = executionGraph.getJobName();
+
 		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
 
 		if (newJobStatus.isGloballyTerminalState()) {
-			// TODO set job end time in JobInfo
-
-			/*
-			  TODO
-			  if (jobInfo.sessionAlive) {
-                jobInfo.setLastActive()
-                val lastActivity = jobInfo.lastActive
-                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
-                  // remove only if no activity occurred in the meantime
-                  if (lastActivity == jobInfo.lastActive) {
-                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
-                  }
-                }(context.dispatcher)
-              } else {
-                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
-              }
-			 */
-
-			if (newJobStatus == JobStatus.FINISHED) {
-				try {
-					final Map<String, SerializedValue<Object>> accumulatorResults =
-							executionGraph.getAccumulatorsSerialized();
-					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
-							jobID, 0, accumulatorResults // TODO get correct job duration
-					);
-					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
-				} catch (Exception e) {
-					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
+			switch (newJobStatus) {
+				case FINISHED:
+					try {
+						// TODO get correct job duration
+						// job done, let's get the accumulators
+						Map<String, Object> accumulatorResults = executionGraph.getAccumulators();
+						JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults); 
+
+						jobCompletionActions.jobFinished(result);
+					}
+					catch (Exception e) {
+						log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
+
+						final JobExecutionException exception = new JobExecutionException(jobID, 
+								"Failed to retrieve accumulator results. " +
+								"The job is registered as 'FINISHED (successful), but this notification describes " +
+								"a failure, since the resulting accumulators could not be fetched.", e);
+
+						jobCompletionActions.jobFailed(exception);
+					}
+					break;
+
+				case CANCELED: {
 					final JobExecutionException exception = new JobExecutionException(
-							jobID, "Failed to retrieve accumulator results.", e);
-					// TODO should we also notify client?
+						jobID, "Job was cancelled.", new Exception("The job was cancelled"));
+
 					jobCompletionActions.jobFailed(exception);
+					break;
 				}
-			} else if (newJobStatus == JobStatus.CANCELED) {
-				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
-				final JobExecutionException exception = new JobExecutionException(
-						jobID, "Job was cancelled.", unpackedError);
-				// TODO should we also notify client?
-				jobCompletionActions.jobFailed(exception);
-			} else if (newJobStatus == JobStatus.FAILED) {
-				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
-				final JobExecutionException exception = new JobExecutionException(
-						jobID, "Job execution failed.", unpackedError);
-				// TODO should we also notify client?
-				jobCompletionActions.jobFailed(exception);
-			} else {
-				final JobExecutionException exception = new JobExecutionException(
-						jobID, newJobStatus + " is not a terminal state.");
-				// TODO should we also notify client?
-				jobCompletionActions.jobFailed(exception);
-				throw new RuntimeException(exception);
+
+				case FAILED: {
+					final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
+					final JobExecutionException exception = new JobExecutionException(
+							jobID, "Job execution failed.", unpackedError);
+					jobCompletionActions.jobFailed(exception);
+					break;
+				}
+
+				default:
+					// this can happen only if the enum is buggy
+					throw new IllegalStateException(newJobStatus.toString());
 			}
 		}
 	}
@@ -845,57 +802,52 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	private void notifyOfNewResourceManagerLeader(
 			final String resourceManagerAddress, final UUID resourceManagerLeaderId)
 	{
-		// IMPORTANT: executed by main thread to avoid concurrence
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				if (resourceManagerConnection != null) {
-					if (resourceManagerAddress != null) {
-						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-								&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
-							// both address and leader id are not changed, we can keep the old connection
-							return;
-						}
-						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-								resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
-					} else {
-						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-								resourceManagerConnection.getTargetAddress());
-					}
-				}
+		validateRunsInMainThread();
 
-				closeResourceManagerConnection();
-
-				if (resourceManagerAddress != null) {
-					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
-					resourceManagerConnection = new ResourceManagerConnection(
-							log, jobGraph.getJobID(), leaderSessionID,
-							resourceManagerAddress, resourceManagerLeaderId, executionContext);
-					resourceManagerConnection.start();
+		if (resourceManagerConnection != null) {
+			if (resourceManagerAddress != null) {
+				if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+						&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
+					// both address and leader id are not changed, we can keep the old connection
+					return;
 				}
+				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
+						resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+			} else {
+				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+						resourceManagerConnection.getTargetAddress());
 			}
-		});
+		}
+
+		closeResourceManagerConnection();
+
+		if (resourceManagerAddress != null) {
+			log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+			resourceManagerConnection = new ResourceManagerConnection(
+					log, jobGraph.getJobID(), getAddress(), leaderSessionID,
+					resourceManagerAddress, resourceManagerLeaderId, executionContext);
+			resourceManagerConnection.start();
+		}
 	}
 
 	private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-		getRpcService().execute(new Runnable() {
-			@Override
-			public void run() {
-				// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
-				// verify the response with current connection
-				if (resourceManagerConnection != null
-						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
-				{
-					log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
-							success.getResourceManagerLeaderId());
-					slotPool.setResourceManager(success.getResourceManagerLeaderId(),
-							resourceManagerConnection.getTargetGateway());
-				}
-			}
-		});
+		validateRunsInMainThread();
+	
+		// verify the response with current connection
+		if (resourceManagerConnection != null
+				&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+		{
+			log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
+					success.getResourceManagerLeaderId());
+
+			slotPool.setResourceManager(
+					success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway());
+		}
 	}
 
 	private void closeResourceManagerConnection() {
+		validateRunsInMainThread();
+
 		if (resourceManagerConnection != null) {
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
@@ -903,32 +855,49 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		slotPool.disconnectResourceManager();
 	}
 
+	private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException {
+		if (this.leaderSessionID == null || !this.leaderSessionID.equals(leaderSessionID)) {
+			throw new LeaderIdMismatchException(this.leaderSessionID, leaderSessionID);
+		}
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Utility classes
 	//----------------------------------------------------------------------------------------------
 
 	private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
+
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-			notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
+				}
+			});
 		}
 
 		@Override
 		public void handleError(final Exception exception) {
-			handleFatalError(exception);
+			handleFatalError(new Exception("Fatal error in the ResourceManager leader service", exception));
 		}
 	}
 
+	//----------------------------------------------------------------------------------------------
+
 	private class ResourceManagerConnection
 			extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
 	{
 		private final JobID jobID;
 
+		private final String jobManagerRpcAddress;
+
 		private final UUID jobManagerLeaderID;
 
 		ResourceManagerConnection(
 				final Logger log,
 				final JobID jobID,
+				final String jobManagerRpcAddress,
 				final UUID jobManagerLeaderID,
 				final String resourceManagerAddress,
 				final UUID resourceManagerLeaderID,
@@ -936,6 +905,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		{
 			super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
 			this.jobID = checkNotNull(jobID);
+			this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress);
 			this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID);
 		}
 
@@ -946,18 +916,29 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					getTargetAddress(), getTargetLeaderId())
 			{
 				@Override
-				protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
-						long timeoutMillis) throws Exception
+				protected Future<RegistrationResponse> invokeRegistration(
+						ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
 				{
 					Time timeout = Time.milliseconds(timeoutMillis);
-					return gateway.registerJobManager(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
+
+					return gateway.registerJobManager(
+						leaderId,
+						jobManagerLeaderID,
+						jobManagerRpcAddress,
+						jobID,
+						timeout);
 				}
 			};
 		}
 
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-			onResourceManagerRegistrationSuccess(success);
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					onResourceManagerRegistrationSuccess(success);
+				}
+			});
 		}
 
 		@Override
@@ -965,4 +946,25 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			handleFatalError(failure);
 		}
 	}
+
+	//----------------------------------------------------------------------------------------------
+
+	private class JobManagerJobStatusListener implements JobStatusListener {
+
+		@Override
+		public void jobStatusChanges(
+				final JobID jobId,
+				final JobStatus newJobStatus,
+				final long timestamp,
+				final Throwable error) {
+
+			// run in rpc thread to avoid concurrency
+			runAsync(new Runnable() {
+				@Override
+				public void run() {
+					jobStatusChanged(newJobStatus, timestamp, error);
+				}
+			});
+		}
+	}
 }