You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:21 UTC

[02/52] [abbrv] flink git commit: [FLINK-4375] [distributed coordination] Implement new JobManager creation, initialization, and basic RPC methods

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b94f904..abc59cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,26 +18,19 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
@@ -48,9 +41,10 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -59,16 +53,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -84,22 +73,26 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
+
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -110,16 +103,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * It offers the following methods as part of its rpc interface to interact with the JobMaster
  * remotely:
  * <ul>
- * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
+ * <li>{@link #updateTaskExecutionState} updates the task execution state for
  * given task</li>
  * </ul>
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
+	private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID");
+
+	// ------------------------------------------------------------------------
+
 	/** Logical representation of the job */
 	private final JobGraph jobGraph;
 
-	/** Configuration of the job */
+	/** Configuration of the JobManager */
 	private final Configuration configuration;
 
 	/** Service to contend for and retrieve the leadership of JM and RM */
@@ -128,37 +126,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Blob cache manager used across jobs */
 	private final BlobLibraryCacheManager libraryCacheManager;
 
-	/** Factory to create restart strategy for this job */
-	private final RestartStrategyFactory restartStrategyFactory;
-
-	/** Store for save points */
-	private final SavepointStore savepointStore;
-
-	/** The timeout for this job */
-	private final Time timeout;
-
-	/** The scheduler to use for scheduling new tasks as they are needed */
-	private final Scheduler scheduler;
+	/** The metrics for the JobManager itself */
+	private final MetricGroup jobManagerMetricGroup;
 
-	/** The metrics group used across jobs */
-	private final JobManagerMetricGroup jobManagerMetricGroup;
+	/** The metrics for the job */
+	private final MetricGroup jobMetricGroup;
 
 	/** The execution context which is used to execute futures */
-	private final Executor executionContext;
+	private final ExecutorService executionContext;
 
 	private final OnCompletionActions jobCompletionActions;
 
-	/** The execution graph of this job */
-	private volatile ExecutionGraph executionGraph;
-
-	/** The checkpoint recovery factory used by this job */
-	private CheckpointRecoveryFactory checkpointRecoveryFactory;
+	private final FatalErrorHandler errorHandler;
 
-	private ClassLoader userCodeLoader;
+	private final ClassLoader userCodeLoader;
 
-	private RestartStrategy restartStrategy;
+	/** The execution graph of this job */
+	private final ExecutionGraph executionGraph;
 
-	private MetricGroup jobMetrics;
 
 	private volatile UUID leaderSessionID;
 
@@ -168,22 +153,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
 
 	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
-	private volatile ResourceManagerConnection resourceManagerConnection;
+	private ResourceManagerConnection resourceManagerConnection;
+
+	// TODO - we need to replace this with the slot pool
+	private final Scheduler scheduler;
 
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
-		JobGraph jobGraph,
-		Configuration configuration,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityService,
-		BlobLibraryCacheManager libraryCacheManager,
-		RestartStrategyFactory restartStrategyFactory,
-		SavepointStore savepointStore,
-		Time timeout,
-		Scheduler scheduler,
-		JobManagerMetricGroup jobManagerMetricGroup,
-		OnCompletionActions jobCompletionActions)
+			JobGraph jobGraph,
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityService,
+			ExecutorService executorService,
+			BlobLibraryCacheManager libraryCacheManager,
+			RestartStrategyFactory restartStrategyFactory,
+			Time rpcAskTimeout,
+			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
+			OnCompletionActions jobCompletionActions,
+			FatalErrorHandler errorHandler,
+			ClassLoader userCodeLoader) throws Exception
 	{
 		super(rpcService);
 
@@ -191,293 +180,150 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		this.configuration = checkNotNull(configuration);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
-		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
-		this.savepointStore = checkNotNull(savepointStore);
-		this.timeout = checkNotNull(timeout);
-		this.scheduler = checkNotNull(scheduler);
-		this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
-		this.executionContext = checkNotNull(rpcService.getExecutor());
+		this.executionContext = checkNotNull(executorService);
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
-	}
-
-	//----------------------------------------------------------------------------------------------
-	// Lifecycle management
-	//----------------------------------------------------------------------------------------------
+		this.errorHandler = checkNotNull(errorHandler);
+		this.userCodeLoader = checkNotNull(userCodeLoader);
 
-	/**
-	 * Initializing the job execution environment, should be called before start. Any error occurred during
-	 * initialization will be treated as job submission failure.
-	 *
-	 * @throws JobSubmissionException
-	 */
-	public void init() throws JobSubmissionException {
-		log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+		final String jobName = jobGraph.getName();
+		final JobID jid = jobGraph.getJobID();
 
-		try {
-			// IMPORTANT: We need to make sure that the library registration is the first action,
-			// because this makes sure that the uploaded jar files are removed in case of
-			// unsuccessful
-			try {
-				libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(),
-					jobGraph.getClasspaths());
-			} catch (Throwable t) {
-				throw new JobSubmissionException(jobGraph.getJobID(),
-					"Cannot set up the user code libraries: " + t.getMessage(), t);
-			}
+		if (jobManagerMetricGroup != null) {
+			this.jobManagerMetricGroup = jobManagerMetricGroup;
+			this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
+		} else {
+			this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
+			this.jobMetricGroup = new UnregisteredMetricsGroup();
+		}
 
-			userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
-			if (userCodeLoader == null) {
-				throw new JobSubmissionException(jobGraph.getJobID(),
-					"The user code class loader could not be initialized.");
-			}
+		log.info("Initializing job {} ({}).", jobName, jid);
 
-			if (jobGraph.getNumberOfVertices() == 0) {
-				throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
-			}
-
-			final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+		final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
 				jobGraph.getSerializedExecutionConfig()
-					.deserializeValue(userCodeLoader)
-					.getRestartStrategy();
-			if (restartStrategyConfiguration != null) {
-				restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
-			}
-			else {
-				restartStrategy = restartStrategyFactory.createRestartStrategy();
-			}
+						.deserializeValue(userCodeLoader)
+						.getRestartStrategy();
 
-			log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+		final RestartStrategy restartStrategy = (restartStrategyConfiguration != null) ?
+				RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
+				restartStrategyFactory.createRestartStrategy();
 
-			if (jobManagerMetricGroup != null) {
-				jobMetrics = jobManagerMetricGroup.addJob(jobGraph);
-			}
-			if (jobMetrics == null) {
-				jobMetrics = new UnregisteredMetricsGroup();
-			}
+		log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
 
-			try {
-				checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
-			} catch (Exception e) {
-				log.error("Could not get the checkpoint recovery factory.", e);
-				throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e);
-			}
+		CheckpointRecoveryFactory checkpointRecoveryFactory;
+		try {
+			checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
+		} catch (Exception e) {
+			log.error("Could not create the access to highly-available checkpoint storage.", e);
+			throw new Exception("Could not create the access to highly-available checkpoint storage.", e);
+		}
 
-			try {
-				resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
-			} catch (Exception e) {
-				log.error("Could not get the resource manager leader retriever.", e);
-				throw new JobSubmissionException(jobGraph.getJobID(),
+		try {
+			resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
+		} catch (Exception e) {
+			log.error("Could not get the resource manager leader retriever.", e);
+			throw new JobSubmissionException(jobGraph.getJobID(),
 					"Could not get the resource manager leader retriever.", e);
-			}
-		} catch (Throwable t) {
-			log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
-
-			libraryCacheManager.unregisterJob(jobGraph.getJobID());
-
-			if (t instanceof JobSubmissionException) {
-				throw (JobSubmissionException) t;
-			}
-			else {
-				throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
-					jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t);
-			}
 		}
+
+		this.executionGraph = ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				configuration,
+				executorService,
+				executorService,
+				userCodeLoader,
+				checkpointRecoveryFactory,
+				rpcAskTimeout,
+				restartStrategy,
+				jobMetricGroup,
+				-1,
+				log);
+
+		// TODO - temp fix
+		this.scheduler = new Scheduler(executorService);
 	}
 
+	//----------------------------------------------------------------------------------------------
+	// Lifecycle management
+	//----------------------------------------------------------------------------------------------
+
+
 	@Override
 	public void start() {
-		super.start();
+		throw new UnsupportedOperationException("Should never call start() without leader ID");
 	}
 
+	/**
+	 * Start the rpc service and begin to run the job.
+	 *
+	 * @param leaderSessionID The necessary leader id for running the job.
+	 */
+	public void start(final UUID leaderSessionID) {
+		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
+			super.start();
+
+			log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
+			getSelf().startJobExecution();
+		} else {
+			log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID);
+		}
+	}
+
+	/**
+	 * Suspend the job and shutdown all other services including rpc.
+	 */
 	@Override
 	public void shutDown() {
+		// make sure there is a graceful exit
+		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
-
-		suspendJob(new Exception("JobManager is shutting down."));
-
-		disposeCommunicationWithResourceManager();
 	}
 
-
-
 	//----------------------------------------------------------------------------------------------
 	// RPC methods
 	//----------------------------------------------------------------------------------------------
 
-	/**
-	 * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint
-	 * being recovered. After this, we will begin to schedule the job.
-	 */
-	@RpcMethod
-	public void startJob(final UUID leaderSessionID) {
-		log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+	//-- job starting and stopping  -----------------------------------------------------------------
 
-		this.leaderSessionID = leaderSessionID;
+	@RpcMethod
+	public void startJobExecution() {
+		log.info("Starting execution of job {} ({}) with leaderId {}.",
+				jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
 
 		try {
-			if (executionGraph != null) {
-				executionGraph = new ExecutionGraph(
-						executionContext,
-						executionContext,
-						jobGraph.getJobID(),
-						jobGraph.getName(),
-						jobGraph.getJobConfiguration(),
-						jobGraph.getSerializedExecutionConfig(),
-						timeout,
-						restartStrategy,
-						jobGraph.getUserJarBlobKeys(),
-						jobGraph.getClasspaths(),
-						userCodeLoader,
-						jobMetrics);
-			} else {
-				// TODO: update last active time in JobInfo
-			}
-
-			executionGraph.setScheduleMode(jobGraph.getScheduleMode());
-			executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
-
-			try {
-				executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
-			} catch (Exception e) {
-				log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e);
-				executionGraph.setJsonPlan("{}");
-			}
-
-			// initialize the vertices that have a master initialization hook
-			// file output formats create directories here, input formats create splits
-			if (log.isDebugEnabled()) {
-				log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
-			}
-			for (JobVertex vertex : jobGraph.getVertices()) {
-				final String executableClass = vertex.getInvokableClassName();
-				if (executableClass == null || executableClass.length() == 0) {
-					throw new JobExecutionException(jobGraph.getJobID(),
-						"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
-				}
-				if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-					vertex.setParallelism(scheduler.getTotalNumberOfSlots());
-				}
-
-				try {
-					vertex.initializeOnMaster(userCodeLoader);
-				} catch (Throwable t) {
-					throw new JobExecutionException(jobGraph.getJobID(),
-						"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
-				}
-			}
-
-			// topologically sort the job vertices and attach the graph to the existing one
-			final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
-			if (log.isDebugEnabled()) {
-				log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(),
-					jobGraph.getJobID(), jobGraph.getName());
-			}
-			executionGraph.attachJobGraph(sortedTopology);
-
-			if (log.isDebugEnabled()) {
-				log.debug("Successfully created execution graph from job graph {} ({}).",
-					jobGraph.getJobID(), jobGraph.getName());
-			}
-
-			final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
-			if (snapshotSettings != null) {
-				List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId(
-					executionGraph, snapshotSettings.getVerticesToTrigger());
-
-				List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId(
-					executionGraph, snapshotSettings.getVerticesToAcknowledge());
-
-				List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId(
-					executionGraph, snapshotSettings.getVerticesToConfirm());
-
-				CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore(
-					jobGraph.getJobID(), userCodeLoader);
-
-				CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(
-					jobGraph.getJobID());
-
-				// Checkpoint stats tracker
-				boolean isStatsDisabled = configuration.getBoolean(
-					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
-				final CheckpointStatsTracker checkpointStatsTracker;
-				if (isStatsDisabled) {
-					checkpointStatsTracker = new DisabledCheckpointStatsTracker();
-				}
-				else {
-					int historySize = configuration.getInteger(
-						ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-						ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-					checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
-				}
-
-				String externalizedCheckpointsDir = configuration.getString(
-						ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
-
-				executionGraph.enableSnapshotCheckpointing(
-					snapshotSettings.getCheckpointInterval(),
-					snapshotSettings.getCheckpointTimeout(),
-					snapshotSettings.getMinPauseBetweenCheckpoints(),
-					snapshotSettings.getMaxConcurrentCheckpoints(),
-					snapshotSettings.getExternalizedCheckpointSettings(),
-					triggerVertices,
-					ackVertices,
-					confirmVertices,
-					checkpointIdCounter,
-					completedCheckpoints,
-					externalizedCheckpointsDir,
-					checkpointStatsTracker);
-			}
-
-			// TODO: register this class to execution graph as job status change listeners
-
-			// TODO: register client as job / execution status change listeners if they are interested
-
-			/*
-			TODO: decide whether we should take the savepoint before recovery
-
-			if (isRecovery) {
-				// this is a recovery of a master failure (this master takes over)
-				executionGraph.restoreLatestCheckpointedState();
-			} else {
-				if (snapshotSettings != null) {
-					String savepointPath = snapshotSettings.getSavepointPath();
-					if (savepointPath != null) {
-						// got a savepoint
-						log.info("Starting job from savepoint {}.", savepointPath);
-
-						// load the savepoint as a checkpoint into the system
-						final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
-							jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath);
-						executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
-
-						// Reset the checkpoint ID counter
-						long nextCheckpointId = savepoint.getCheckpointID() + 1;
-						log.info("Reset the checkpoint ID to " + nextCheckpointId);
-						executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
-
-						executionGraph.restoreLatestCheckpointedState();
-					}
+			// register self as job status change listener
+			executionGraph.registerJobStatusListener(new JobStatusListener() {
+				@Override
+				public void jobStatusChanges(
+						final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error)
+				{
+					// run in rpc thread to avoid concurrency
+					runAsync(new Runnable() {
+						@Override
+						public void run() {
+							jobStatusChanged(newJobStatus, timestamp, error);
+						}
+					});
 				}
-			}
-			*/
+			});
 
-			// job is good to go, try to locate resource manager's address
+			// job is ready to go, try to establish connection with resource manager
 			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 		} catch (Throwable t) {
+
+			// TODO - this should not result in a job failure, but another leader should take over
+			// TODO - either this master should retry the execution, or it should relinquish leadership / terminate
+
 			log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
 
 			executionGraph.fail(t);
-			executionGraph = null;
 
-			final Throwable rt;
+			final JobExecutionException rt;
 			if (t instanceof JobExecutionException) {
 				rt = (JobExecutionException) t;
-			}
-			else {
+			} else {
 				rt = new JobExecutionException(jobGraph.getJobID(),
-					"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
+						"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
 			}
 
 			// TODO: notify client about this failure
@@ -490,34 +336,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		executionContext.execute(new Runnable() {
 			@Override
 			public void run() {
-				if (executionGraph != null) {
-					try {
-						executionGraph.scheduleForExecution(scheduler);
-					} catch (Throwable t) {
-						executionGraph.fail(t);
-					}
+				try {
+					executionGraph.scheduleForExecution(scheduler);
+				} catch (Throwable t) {
+					executionGraph.fail(t);
 				}
 			}
 		});
 	}
 
 	/**
-	 * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared.
+	 * Suspending job, all the running tasks will be cancelled, and communication with other components
+	 * will be disposed.
+	 *
+	 * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
+	 * calling the {@link #start(UUID)} method once we take the leadership back again.
 	 *
 	 * @param cause The reason of why this job been suspended.
 	 */
 	@RpcMethod
-	public void suspendJob(final Throwable cause) {
+	public void suspendExecution(final Throwable cause) {
+		if (leaderSessionID == null) {
+			log.debug("Job has already been suspended or shutdown.");
+			return;
+		}
+
+		// receive no more messages until started again, should be called before we clear self leader id
+		((StartStoppable) getSelf()).stop();
+
 		leaderSessionID = null;
+		executionGraph.suspend(cause);
 
-		if (executionGraph != null) {
-			executionGraph.suspend(cause);
-			executionGraph = null;
+		// disconnect from resource manager:
+		try {
+			resourceManagerLeaderRetriever.stop();
+		} catch (Exception e) {
+			log.warn("Failed to stop resource manager leader retriever when suspending.");
 		}
+		closeResourceManagerConnection();
+
+		// TODO: disconnect from all registered task managers
 
-		disposeCommunicationWithResourceManager();
 	}
 
+	//----------------------------------------------------------------------------------------------
+
 	/**
 	 * Updates the task execution state for a given task.
 	 *
@@ -525,26 +388,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * @return Acknowledge the task execution state update
 	 */
 	@RpcMethod
-	public Acknowledge updateTaskExecutionState(final TaskExecutionState taskExecutionState) throws ExecutionGraphException {
+	public Acknowledge updateTaskExecutionState(
+			final UUID leaderSessionID,
+			final TaskExecutionState taskExecutionState) throws Exception
+	{
 		if (taskExecutionState == null) {
 			throw new NullPointerException("TaskExecutionState must not be null.");
 		}
 
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
+
 		if (executionGraph.updateState(taskExecutionState)) {
 			return Acknowledge.get();
 		} else {
 			throw new ExecutionGraphException("The execution attempt " +
-				taskExecutionState.getID() + " was not found.");
+					taskExecutionState.getID() + " was not found.");
 		}
-
 	}
 
-
 	@RpcMethod
 	public SerializedInputSplit requestNextInputSplit(
-		final JobVertexID vertexID,
-		final ExecutionAttemptID executionAttempt) throws Exception
+			final UUID leaderSessionID,
+			final JobVertexID vertexID,
+			final ExecutionAttemptID executionAttempt) throws Exception
 	{
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
+
 		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
 		if (execution == null) {
 			// can happen when JobManager had already unregistered this execution upon on task failure,
@@ -583,7 +458,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		} catch (Exception ex) {
 			log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
 			IOException reason = new IOException("Could not serialize the next input split of class " +
-				nextInputSplit.getClass() + ".", ex);
+					nextInputSplit.getClass() + ".", ex);
 			vertex.fail(reason);
 			throw reason;
 		}
@@ -591,16 +466,21 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public ExecutionState requestPartitionState(
-		final JobID ignored,
-		final IntermediateDataSetID intermediateResultId,
-		final ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
+			final UUID leaderSessionID,
+			final IntermediateDataSetID intermediateResultId,
+			final ResultPartitionID resultPartitionId) throws Exception {
+
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
 
 		final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
 		if (execution != null) {
 			return execution.getState();
 		}
 		else {
-			final IntermediateResult intermediateResult = 
+			final IntermediateResult intermediateResult =
 					executionGraph.getAllIntermediateResults().get(intermediateResultId);
 
 			if (intermediateResult != null) {
@@ -623,7 +503,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public Acknowledge scheduleOrUpdateConsumers(ResultPartitionID partitionID) throws ExecutionGraphException {
+	public Acknowledge scheduleOrUpdateConsumers(
+			final UUID leaderSessionID,
+			final ResultPartitionID partitionID) throws Exception
+	{
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
+
 		executionGraph.scheduleOrUpdateConsumers(partitionID);
 		return Acknowledge.get();
 	}
@@ -638,171 +526,118 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			final JobID jobID,
 			final ExecutionAttemptID executionAttemptID,
 			final CheckpointMetaData checkpointInfo,
-			final SubtaskState checkpointStateHandles) {
+			final SubtaskState checkpointState) throws CheckpointException {
 
-		throw new UnsupportedOperationException();
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		final AcknowledgeCheckpoint ackMessage = 
+				new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointInfo, checkpointState);
+
+		if (checkpointCoordinator != null) {
+			getRpcService().execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
+					} catch (Throwable t) {
+						log.warn("Error while processing checkpoint acknowledgement message");
+					}
+				}
+			});
+		} else {
+			log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
+					jobGraph.getJobID());
+		}
 	}
 
 	@RpcMethod
 	public void declineCheckpoint(
 			final JobID jobID,
 			final ExecutionAttemptID executionAttemptID,
-			final long checkpointId,
-			final Throwable cause) {
-
-		throw new UnsupportedOperationException();
-	}
-
-	//----------------------------------------------------------------------------------------------
-	// Internal methods
-	//----------------------------------------------------------------------------------------------
-
-	@RpcMethod
-	public void resourceRemoved(final ResourceID resourceId, final String message) {
-		// TODO: remove resource from slot pool
-	}
+			final long checkpointID,
+			final Throwable reason)
+	{
+		final DeclineCheckpoint decline = new DeclineCheckpoint(
+				jobID, executionAttemptID, checkpointID, reason);
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
 
-	@RpcMethod
-	public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) {
-		if (executionGraph != null) {
-			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-			if (checkpointCoordinator != null) {
-				getRpcService().execute(new Runnable() {
-					@Override
-					public void run() {
-						try {
-							if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
-								log.info("Received message for non-existing checkpoint {}.",
-									acknowledge.getCheckpointId());
-							}
-						} catch (Exception e) {
-							log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e);
-						}
+		if (checkpointCoordinator != null) {
+			getRpcService().execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						checkpointCoordinator.receiveDeclineMessage(decline);
+					} catch (Exception e) {
+						log.error("Error in CheckpointCoordinator while processing {}", decline, e);
 					}
-				});
-			}
-			else {
-				log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
-					jobGraph.getJobID());
-			}
+				}
+			});
 		} else {
-			log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
-		}
-	}
-
-	@RpcMethod
-	public void declineCheckpoint(final DeclineCheckpoint decline) {
-		if (executionGraph != null) {
-			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-			if (checkpointCoordinator != null) {
-				getRpcService().execute(new Runnable() {
-					@Override
-					public void run() {
-						try {
-							log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId());
-						} catch (Exception e) {
-							log.error("Error in CheckpointCoordinator while processing {}", decline, e);
-						}
-					}
-				});
-			} else {
-				log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
+			log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
 					jobGraph.getJobID());
-			}
-		} else {
-			log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
 		}
 	}
 
 	@RpcMethod
 	public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception {
-		if (executionGraph != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Lookup key-value state for job {} with registration " +
+		if (log.isDebugEnabled()) {
+			log.debug("Lookup key-value state for job {} with registration " +
 					"name {}.", jobGraph.getJobID(), registrationName);
-			}
+		}
 
-			final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
-			final KvStateLocation location = registry.getKvStateLocation(registrationName);
-			if (location != null) {
-				return location;
-			} else {
-				throw new UnknownKvStateLocation(registrationName);
-			}
+		final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
+		final KvStateLocation location = registry.getKvStateLocation(registrationName);
+		if (location != null) {
+			return location;
 		} else {
-			throw new IllegalStateException("Received lookup KvState location request for unavailable job " +
-				jobGraph.getJobID());
+			throw new UnknownKvStateLocation(registrationName);
 		}
 	}
 
 	@RpcMethod
 	public void notifyKvStateRegistered(
-		final JobVertexID jobVertexId,
-		final KeyGroupRange keyGroupRange,
-		final String registrationName,
-		final KvStateID kvStateId,
-		final KvStateServerAddress kvStateServerAddress)
+			final JobVertexID jobVertexId,
+			final KeyGroupRange keyGroupRange,
+			final String registrationName,
+			final KvStateID kvStateId,
+			final KvStateServerAddress kvStateServerAddress)
 	{
-		if (executionGraph != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state registered for job {} under name {}.",
+		if (log.isDebugEnabled()) {
+			log.debug("Key value state registered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
-			}
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress
-				);
-			} catch (Exception e) {
-				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
-			}
-		} else {
-			log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID());
+		}
+
+		try {
+			executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+		} catch (Exception e) {
+			log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
 		}
 	}
 
 	@RpcMethod
 	public void notifyKvStateUnregistered(
-		JobVertexID jobVertexId,
-		KeyGroupRange keyGroupRange,
-		String registrationName)
+			JobVertexID jobVertexId,
+			KeyGroupRange keyGroupRange,
+			String registrationName)
 	{
-		if (executionGraph != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state unregistered for job {} under name {}.",
+		if (log.isDebugEnabled()) {
+			log.debug("Key value state unregistered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
-			}
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-					jobVertexId, keyGroupRange, registrationName
-				);
-			} catch (Exception e) {
-				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
-			}
-		} else {
-			log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID());
 		}
-	}
 
-	@RpcMethod
-	public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception {
-		return null;
-	}
-
-	@RpcMethod
-	public DisposeSavepointResponse disposeSavepoint(final String savepointPath) {
-		// TODO
-		return null;
+		try {
+			executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+					jobVertexId, keyGroupRange, registrationName);
+		} catch (Exception e) {
+			log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
+		}
 	}
 
 	@RpcMethod
 	public ClassloadingProps requestClassloadingProps() throws Exception {
-		if (executionGraph != null) {
-			return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+		return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
 				executionGraph.getRequiredJarFiles(),
 				executionGraph.getRequiredClasspaths());
-		} else {
-			throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID());
-		}
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -815,12 +650,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			public void run() {
 				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
 				shutDown();
-				jobCompletionActions.onFatalError(cause);
+				errorHandler.onFatalError(cause);
 			}
 		});
 	}
 
-	// TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
 	private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
 		final JobID jobID = executionGraph.getJobID();
 		final String jobName = executionGraph.getJobName();
@@ -848,36 +682,33 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			if (newJobStatus == JobStatus.FINISHED) {
 				try {
 					final Map<String, SerializedValue<Object>> accumulatorResults =
-						executionGraph.getAccumulatorsSerialized();
+							executionGraph.getAccumulatorsSerialized();
 					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
-						jobID, 0, accumulatorResults // TODO get correct job duration
+							jobID, 0, accumulatorResults // TODO get correct job duration
 					);
 					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
 				} catch (Exception e) {
 					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
 					final JobExecutionException exception = new JobExecutionException(
-						jobID, "Failed to retrieve accumulator results.", e);
+							jobID, "Failed to retrieve accumulator results.", e);
 					// TODO should we also notify client?
 					jobCompletionActions.jobFailed(exception);
 				}
-			}
-			else if (newJobStatus == JobStatus.CANCELED) {
+			} else if (newJobStatus == JobStatus.CANCELED) {
 				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
 				final JobExecutionException exception = new JobExecutionException(
-					jobID, "Job was cancelled.", unpackedError);
+						jobID, "Job was cancelled.", unpackedError);
 				// TODO should we also notify client?
 				jobCompletionActions.jobFailed(exception);
-			}
-			else if (newJobStatus == JobStatus.FAILED) {
+			} else if (newJobStatus == JobStatus.FAILED) {
 				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
 				final JobExecutionException exception = new JobExecutionException(
-					jobID, "Job execution failed.", unpackedError);
+						jobID, "Job execution failed.", unpackedError);
 				// TODO should we also notify client?
 				jobCompletionActions.jobFailed(exception);
-			}
-			else {
+			} else {
 				final JobExecutionException exception = new JobExecutionException(
-					jobID, newJobStatus + " is not a terminal state.");
+						jobID, newJobStatus + " is not a terminal state.");
 				// TODO should we also notify client?
 				jobCompletionActions.jobFailed(exception);
 				throw new RuntimeException(exception);
@@ -886,7 +717,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	private void notifyOfNewResourceManagerLeader(
-		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
+			final String resourceManagerAddress, final UUID resourceManagerLeaderId)
 	{
 		// IMPORTANT: executed by main thread to avoid concurrence
 		runAsync(new Runnable() {
@@ -895,17 +726,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				if (resourceManagerConnection != null) {
 					if (resourceManagerAddress != null) {
 						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-							&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
-						{
+								&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
 							// both address and leader id are not changed, we can keep the old connection
 							return;
 						}
 						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-							resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
-					}
-					else {
+								resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+					} else {
 						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-							resourceManagerConnection.getTargetAddress());
+								resourceManagerConnection.getTargetAddress());
 					}
 				}
 
@@ -914,8 +743,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				if (resourceManagerAddress != null) {
 					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
 					resourceManagerConnection = new ResourceManagerConnection(
-						log, jobGraph.getJobID(), leaderSessionID,
-						resourceManagerAddress, resourceManagerLeaderId, executionContext);
+							log, jobGraph.getJobID(), leaderSessionID,
+							resourceManagerAddress, resourceManagerLeaderId, executionContext);
 					resourceManagerConnection.start();
 				}
 			}
@@ -929,26 +758,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
 				// verify the response with current connection
 				if (resourceManagerConnection != null
-					&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
 					log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
-						success.getResourceManagerLeaderId());
+							success.getResourceManagerLeaderId());
 				}
 			}
 		});
 	}
 
-	private void disposeCommunicationWithResourceManager() {
-		// 1. stop the leader retriever so we will not receiving updates anymore
-		try {
-			resourceManagerLeaderRetriever.stop();
-		} catch (Exception e) {
-			log.warn("Failed to stop resource manager leader retriever.");
-		}
-
-		// 2. close current connection with ResourceManager if exists
-		closeResourceManagerConnection();
-	}
-
 	private void closeResourceManagerConnection() {
 		if (resourceManagerConnection != null) {
 			resourceManagerConnection.close();
@@ -957,34 +774,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	//----------------------------------------------------------------------------------------------
-	// Helper methods
-	//----------------------------------------------------------------------------------------------
-
-	/**
-	 * Converts JobVertexIDs to corresponding ExecutionJobVertexes
-	 *
-	 * @param executionGraph The execution graph that holds the relationship
-	 * @param vertexIDs      The vertexIDs need to be converted
-	 * @return The corresponding ExecutionJobVertexes
-	 * @throws JobExecutionException
-	 */
-	private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
-		final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs)
-		throws JobExecutionException
-	{
-		final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size());
-		for (JobVertexID vertexID : vertexIDs) {
-			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID);
-			if (executionJobVertex == null) {
-				throw new JobExecutionException(executionGraph.getJobID(),
-					"The snapshot checkpointing settings refer to non-existent vertex " + vertexID);
-			}
-			ret.add(executionJobVertex);
-		}
-		return ret;
-	}
-
-	//----------------------------------------------------------------------------------------------
 	// Utility classes
 	//----------------------------------------------------------------------------------------------
 
@@ -1001,19 +790,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	private class ResourceManagerConnection
-		extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
+			extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
 	{
 		private final JobID jobID;
 
 		private final UUID jobManagerLeaderID;
 
 		ResourceManagerConnection(
-			final Logger log,
-			final JobID jobID,
-			final UUID jobManagerLeaderID,
-			final String resourceManagerAddress,
-			final UUID resourceManagerLeaderID,
-			final Executor executor)
+				final Logger log,
+				final JobID jobID,
+				final UUID jobManagerLeaderID,
+				final String resourceManagerAddress,
+				final UUID resourceManagerLeaderID,
+				final Executor executor)
 		{
 			super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
 			this.jobID = checkNotNull(jobID);
@@ -1023,12 +812,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		@Override
 		protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
 			return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>(
-				log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
-				getTargetAddress(), getTargetLeaderId())
+					log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
+					getTargetAddress(), getTargetLeaderId())
 			{
 				@Override
 				protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
-					long timeoutMillis) throws Exception
+						long timeoutMillis) throws Exception
 				{
 					Time timeout = Time.milliseconds(timeoutMillis);
 					return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 5223b3e..daa33a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -32,11 +31,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
@@ -52,52 +47,54 @@ import java.util.UUID;
  */
 public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 
-	/**
-	 * Starting the job under the given leader session ID.
-	 */
-	void startJob(final UUID leaderSessionID);
+	// ------------------------------------------------------------------------
+	//  Job start and stop methods
+	// ------------------------------------------------------------------------
 
-	/**
-	 * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared.
-	 * Should re-submit the job before restarting it.
-	 *
-	 * @param cause The reason of why this job been suspended.
-	 */
-	void suspendJob(final Throwable cause);
+	void startJobExecution();
+
+	void suspendExecution(Throwable cause);
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Updates the task execution state for a given task.
 	 *
+	 * @param leaderSessionID    The leader id of JobManager
 	 * @param taskExecutionState New task execution state for a given task
 	 * @return Future flag of the task execution state update result
 	 */
-	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+	Future<Acknowledge> updateTaskExecutionState(
+			final UUID leaderSessionID,
+			final TaskExecutionState taskExecutionState);
 
 	/**
 	 * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
 	 * as a {@link SerializedInputSplit} message.
 	 *
+	 * @param leaderSessionID  The leader id of JobManager
 	 * @param vertexID         The job vertex id
 	 * @param executionAttempt The execution attempt id
 	 * @return The future of the input split. If there is no further input split, will return an empty object.
 	 */
 	Future<SerializedInputSplit> requestNextInputSplit(
-		final JobVertexID vertexID,
-		final ExecutionAttemptID executionAttempt);
+			final UUID leaderSessionID,
+			final JobVertexID vertexID,
+			final ExecutionAttemptID executionAttempt);
 
 	/**
-	 * Requests the current state of the producer of an intermediate result partition.
+	 * Requests the current state of the partition.
 	 * The state of a partition is currently bound to the state of the producing execution.
 	 *
-	 * @param jobId                TheID of job that the intermediate result partition belongs to.
+	 * @param leaderSessionID The leader id of JobManager
 	 * @param intermediateResultId The execution attempt ID of the task requesting the partition state.
 	 * @param partitionId          The partition ID of the partition to request the state of.
 	 * @return The future of the partition state
 	 */
 	Future<ExecutionState> requestPartitionState(
-			JobID jobId,
-			IntermediateDataSetID intermediateResultId,
-			ResultPartitionID partitionId);
+			final UUID leaderSessionID,
+			final IntermediateDataSetID intermediateResultId,
+			final ResultPartitionID partitionId);
 
 	/**
 	 * Notifies the JobManager about available data for a produced partition.
@@ -108,11 +105,15 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * <p>
 	 * The JobManager then can decide when to schedule the partition consumers of the given session.
 	 *
-	 * @param partitionID The partition which has already produced data
-	 * @param timeout before the rpc call fails
+	 * @param leaderSessionID The leader id of JobManager
+	 * @param partitionID     The partition which has already produced data
+	 * @param timeout         before the rpc call fails
 	 * @return Future acknowledge of the schedule or update operation
 	 */
-	Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID partitionID, @RpcTimeout final Time timeout);
+	Future<Acknowledge> scheduleOrUpdateConsumers(
+			final UUID leaderSessionID,
+			final ResultPartitionID partitionID,
+			@RpcTimeout final Time timeout);
 
 	/**
 	 * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
@@ -123,36 +124,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	void disconnectTaskManager(ResourceID resourceID);
 
 	/**
-	 * Notifies the JobManager about the removal of a resource.
-	 *
-	 * @param resourceId The ID under which the resource is registered.
-	 * @param message    Optional message with details, for logging and debugging.
-	 */
-
-	void resourceRemoved(final ResourceID resourceId, final String message);
-
-	/**
-	 * Notifies the JobManager that the checkpoint of an individual task is completed.
-	 *
-	 * @param acknowledge The acknowledge message of the checkpoint
-	 */
-	void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
-
-	/**
-	 * Notifies the JobManager that a checkpoint request could not be heeded.
-	 * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints.
-	 *
-	 * @param decline The decline message of the checkpoint
-	 */
-	void declineCheckpoint(final DeclineCheckpoint decline);
-
-	/**
 	 * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
 	 *
 	 * @param registrationName Name under which the KvState has been registered.
 	 * @return Future of the requested {@link KvState} location
 	 */
-	Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception;
+	Future<KvStateLocation> lookupKvStateLocation(final String registrationName);
 
 	/**
 	 * @param jobVertexId          JobVertexID the KvState instance belongs to.
@@ -162,11 +139,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param kvStateServerAddress Server address where to find the KvState instance.
 	 */
 	void notifyKvStateRegistered(
-		final JobVertexID jobVertexId,
-		final KeyGroupRange keyGroupRange,
-		final String registrationName,
-		final KvStateID kvStateId,
-		final KvStateServerAddress kvStateServerAddress);
+			final JobVertexID jobVertexId,
+			final KeyGroupRange keyGroupRange,
+			final String registrationName,
+			final KvStateID kvStateId,
+			final KvStateServerAddress kvStateServerAddress);
 
 	/**
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to.
@@ -174,24 +151,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param registrationName Name under which the KvState has been registered.
 	 */
 	void notifyKvStateUnregistered(
-		JobVertexID jobVertexId,
-		KeyGroupRange keyGroupRange,
-		String registrationName);
-
-	/**
-	 * Notifies the JobManager to trigger a savepoint for this job.
-	 *
-	 * @return Future of the savepoint trigger response.
-	 */
-	Future<TriggerSavepointResponse> triggerSavepoint();
-
-	/**
-	 * Notifies the Jobmanager to dispose specified savepoint.
-	 *
-	 * @param savepointPath The path of the savepoint.
-	 * @return The future of the savepoint disponse response.
-	 */
-	Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath);
+			JobVertexID jobVertexId,
+			KeyGroupRange keyGroupRange,
+			String registrationName);
 
 	/**
 	 * Request the classloading props of this job.

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
index e8fb5bb..019ccfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
@@ -62,6 +64,9 @@ public class MiniClusterJobDispatcher {
 	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
 	private final JobManagerServices jobManagerServices;
 
+	/** Registry for all metrics in the mini cluster */
+	private final MetricRegistry metricRegistry;
+
 	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
 	private final int numJobManagers;
 
@@ -86,8 +91,9 @@ public class MiniClusterJobDispatcher {
 	public MiniClusterJobDispatcher(
 			Configuration config,
 			RpcService rpcService,
-			HighAvailabilityServices haServices) throws Exception {
-		this(config, rpcService, haServices, 1);
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry) throws Exception {
+		this(config, rpcService, haServices, metricRegistry, 1);
 	}
 
 	/**
@@ -106,16 +112,18 @@ public class MiniClusterJobDispatcher {
 			Configuration config,
 			RpcService rpcService,
 			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry,
 			int numJobManagers) throws Exception {
 
 		checkArgument(numJobManagers >= 1);
 		this.configuration = checkNotNull(config);
 		this.rpcService = checkNotNull(rpcService);
 		this.haServices = checkNotNull(haServices);
+		this.metricRegistry = checkNotNull(metricRegistry);
 		this.numJobManagers = numJobManagers;
 
 		LOG.info("Creating JobMaster services");
-		this.jobManagerServices = JobManagerServices.fromConfiguration(config);
+		this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
 	}
 
 	// ------------------------------------------------------------------------
@@ -140,9 +148,8 @@ public class MiniClusterJobDispatcher {
 				if (runners != null) {
 					this.runners = null;
 
-					Exception shutdownException = new Exception("The MiniCluster is shutting down");
 					for (JobManagerRunner runner : runners) {
-						runner.shutdown(shutdownException);
+						runner.shutdown();
 					}
 				}
 			}
@@ -171,9 +178,9 @@ public class MiniClusterJobDispatcher {
 			checkState(!shutdown, "mini cluster is shut down");
 			checkState(runners == null, "mini cluster can only execute one job at a time");
 
-			OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers);
+			DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
 
-			this.runners = startJobRunners(job, onJobCompletion);
+			this.runners = startJobRunners(job, finalizer, finalizer);
 		}
 	}
 
@@ -191,17 +198,17 @@ public class MiniClusterJobDispatcher {
 		checkNotNull(job);
 		
 		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
-		final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers);
+		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
 
 		synchronized (lock) {
 			checkState(!shutdown, "mini cluster is shut down");
 			checkState(runners == null, "mini cluster can only execute one job at a time");
 
-			this.runners = startJobRunners(job, onJobCompletion);
+			this.runners = startJobRunners(job, sync, sync);
 		}
 
 		try {
-			return onJobCompletion.getResult();
+			return sync.getResult();
 		}
 		finally {
 			// always clear the status for the next job
@@ -209,24 +216,26 @@ public class MiniClusterJobDispatcher {
 		}
 	}
 
-	private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException {
+	private JobManagerRunner[] startJobRunners(
+			JobGraph job,
+			OnCompletionActions onCompletion,
+			FatalErrorHandler errorHandler) throws JobExecutionException {
 		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
 
 		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
 		for (int i = 0; i < numJobManagers; i++) {
 			try {
 				runners[i] = new JobManagerRunner(job, configuration,
-						rpcService, haServices, jobManagerServices, onCompletion);
+						rpcService, haServices, jobManagerServices, metricRegistry, 
+						onCompletion, errorHandler);
 				runners[i].start();
 			}
 			catch (Throwable t) {
 				// shut down all the ones so far
-				Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t);
-
 				for (int k = 0; k <= i; k++) {
 					try {
 						if (runners[i] != null) {
-							runners[i].shutdown(shutdownCause);
+							runners[i].shutdown();
 						}
 					} catch (Throwable ignored) {
 						// silent shutdown
@@ -244,15 +253,15 @@ public class MiniClusterJobDispatcher {
 	//  test methods to simulate job master failures
 	// ------------------------------------------------------------------------
 
-	public void killJobMaster(int which) {
-		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
-		checkState(!shutdown, "mini cluster is shut down");
-
-		JobManagerRunner[] runners = this.runners;
-		checkState(runners != null, "mini cluster it not executing a job right now");
-
-		runners[which].shutdown(new Throwable("kill JobManager"));
-	}
+//	public void killJobMaster(int which) {
+//		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+//		checkState(!shutdown, "mini cluster is shut down");
+//
+//		JobManagerRunner[] runners = this.runners;
+//		checkState(runners != null, "mini cluster it not executing a job right now");
+//
+//		runners[which].shutdown(new Throwable("kill JobManager"));
+//	}
 
 	// ------------------------------------------------------------------------
 	//  utility classes
@@ -263,7 +272,7 @@ public class MiniClusterJobDispatcher {
 	 * In the case of a high-availability test setup, there may be multiple runners.
 	 * After that, it marks the mini cluster as ready to receive new jobs.
 	 */
-	private class DetachedFinalizer implements OnCompletionActions {
+	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
 
 		private final AtomicInteger numJobManagersToWaitFor;
 
@@ -308,7 +317,7 @@ public class MiniClusterJobDispatcher {
 	 * That way it is guaranteed that after the blocking job submit call returns,
 	 * the dispatcher is immediately free to accept another job.
 	 */
-	private static class BlockingJobSync implements OnCompletionActions {
+	private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
 
 		private final JobID jobId;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
index 520755d..572ba2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/ClassloadingProps.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import java.io.Serializable;
 import java.net.URL;
 import java.util.Collection;
-import java.util.List;
 
 /**
  * The response of classloading props request to JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
deleted file mode 100644
index 42bfc71..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import java.io.Serializable;
-
-/**
- * The response of the dispose savepoint request to JobManager.
- */
-public abstract class DisposeSavepointResponse implements Serializable {
-
-	private static final long serialVersionUID = 6008792963949369567L;
-
-	public static class Success extends DisposeSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = 1572462960008711415L;
-	}
-
-	public static class Failure extends DisposeSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = -7505308325483022458L;
-
-		private final Throwable cause;
-
-		public Failure(final Throwable cause) {
-			this.cause = cause;
-		}
-
-		public Throwable getCause() {
-			return cause;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
deleted file mode 100644
index 0b0edc5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import org.apache.flink.api.common.JobID;
-
-import java.io.Serializable;
-
-/**
- * The response of the trigger savepoint request to JobManager.
- */
-public abstract class TriggerSavepointResponse implements Serializable {
-
-	private static final long serialVersionUID = 3139327824611807707L;
-
-	private final JobID jobID;
-
-	public JobID getJobID() {
-		return jobID;
-	}
-
-	public TriggerSavepointResponse(final JobID jobID) {
-		this.jobID = jobID;
-	}
-
-	public static class Success extends TriggerSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = -1100637460388881776L;
-
-		private final String savepointPath;
-
-		public Success(final JobID jobID, final String savepointPath) {
-			super(jobID);
-			this.savepointPath = savepointPath;
-		}
-
-		public String getSavepointPath() {
-			return savepointPath;
-		}
-	}
-
-	public static class Failure extends TriggerSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = -1668479003490615139L;
-
-		private final Throwable cause;
-
-		public Failure(final JobID jobID, final Throwable cause) {
-			super(jobID);
-			this.cause = cause;
-		}
-
-		public Throwable getCause() {
-			return cause;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 2052f98..4b9100a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
 public interface RpcService {
 
 	/**
-	 * Return the address under which the rpc service can be reached. If the rpc service cannot be
-	 * contacted remotely, then it will return an empty string.
+	 * Return the hostname or host address under which the rpc service can be reached.
+	 * If the rpc service cannot be contacted remotely, then it will return an empty string.
 	 *
 	 * @return Address of the rpc service or empty string if local rpc service
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 72668d2..1b311e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -26,11 +26,16 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 /**
  * Container class for JobManager specific communication utils used by the {@link TaskExecutor}.
  */
 public class JobManagerConnection {
 
+	// Job master leader session id
+	private final UUID jobMasterLeaderId;
+
 	// Gateway to the job master
 	private final JobMasterGateway jobMasterGateway;
 
@@ -50,13 +55,15 @@ public class JobManagerConnection {
 	private final PartitionProducerStateChecker partitionStateChecker;
 
 	public JobManagerConnection(
-		JobMasterGateway jobMasterGateway,
-		TaskManagerActions taskManagerActions,
-		CheckpointResponder checkpointResponder,
-		LibraryCacheManager libraryCacheManager,
-		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
-		PartitionProducerStateChecker partitionStateChecker) {
-
+			UUID jobMasterLeaderId,
+			JobMasterGateway jobMasterGateway,
+			TaskManagerActions taskManagerActions,
+			CheckpointResponder checkpointResponder,
+			LibraryCacheManager libraryCacheManager,
+			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+			PartitionProducerStateChecker partitionStateChecker)
+	{
+		this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
@@ -65,6 +72,10 @@ public class JobManagerConnection {
 		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
 	}
 
+	public UUID getJobMasterLeaderId() {
+		return jobMasterLeaderId;
+	}
+
 	public JobMasterGateway getJobManagerGateway() {
 		return jobMasterGateway;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 36f108e..2389291 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -18,37 +18,46 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
 import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
@@ -62,26 +71,16 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
-import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.RpcService;
-
 import org.apache.flink.util.Preconditions;
 
-import java.util.HashSet;
-import java.util.Set;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -292,6 +291,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 				tdd.getAttemptNumber());
 
 		InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
+				jobManagerConnection.getJobMasterLeaderId(),
 				jobManagerConnection.getJobManagerGateway(),
 				jobInformation.getJobId(),
 				taskInformation.getJobVertexId(),
@@ -605,10 +605,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		clearTasks();
 	}
 
-	private void updateTaskExecutionState(final JobMasterGateway jobMasterGateway, final TaskExecutionState taskExecutionState) {
+	private void updateTaskExecutionState(
+			final UUID jobMasterLeaderId,
+			final JobMasterGateway jobMasterGateway,
+			final TaskExecutionState taskExecutionState)
+	{
 		final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
 
-		Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
+		Future<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(
+				jobMasterLeaderId, taskExecutionState);
 
 		futureAcknowledge.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 			@Override
@@ -620,7 +625,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}, getMainThreadExecutor());
 	}
 
-	private void unregisterTaskAndNotifyFinalState(final JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
+	private void unregisterTaskAndNotifyFinalState(
+			final UUID jobMasterLeaderId,
+			final JobMasterGateway jobMasterGateway,
+			final ExecutionAttemptID executionAttemptID)
+	{
 		Task task = removeTask(executionAttemptID);
 
 		if (task != null) {
@@ -638,14 +647,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
 
 			updateTaskExecutionState(
-				jobMasterGateway,
-				new TaskExecutionState(
-					task.getJobID(),
-					task.getExecutionId(),
-					task.getExecutionState(),
-					task.getFailureCause(),
-					accumulatorSnapshot,
-					task.getMetricGroup().getIOMetricGroup().createSnapshot()));
+					jobMasterLeaderId,
+					jobMasterGateway,
+					new TaskExecutionState(
+							task.getJobID(),
+							task.getExecutionId(),
+							task.getExecutionState(),
+							task.getFailureCause(),
+							accumulatorSnapshot,
+							task.getMetricGroup().getIOMetricGroup().createSnapshot()));
 		} else {
 			log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
 		}
@@ -687,11 +697,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
-	private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, int blobPort) {
+	private JobManagerConnection associateWithJobManager(UUID jobMasterLeaderId,
+			JobMasterGateway jobMasterGateway, int blobPort)
+	{
+		Preconditions.checkNotNull(jobMasterLeaderId);
 		Preconditions.checkNotNull(jobMasterGateway);
 		Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, "Blob port is out of range.");
 
-		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
+		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway);
 
 		CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway);
 
@@ -704,19 +717,21 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			taskManagerConfiguration.getCleanupInterval());
 
 		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
-			jobMasterGateway,
-			getRpcService().getExecutor(),
-			taskManagerConfiguration.getTimeout());
+				jobMasterLeaderId,
+				jobMasterGateway,
+				getRpcService().getExecutor(),
+				taskManagerConfiguration.getTimeout());
 
-		PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway);
+		PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway);
 
 		return new JobManagerConnection(
-			jobMasterGateway,
-			taskManagerActions,
-			checkpointResponder,
-			libraryCacheManager,
-			resultPartitionConsumableNotifier,
-			partitionStateChecker);
+				jobMasterLeaderId,
+				jobMasterGateway,
+				taskManagerActions,
+				checkpointResponder,
+				libraryCacheManager,
+				resultPartitionConsumableNotifier,
+				partitionStateChecker);
 	}
 
 	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException {
@@ -808,9 +823,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	}
 
 	private class TaskManagerActionsImpl implements TaskManagerActions {
+		private final UUID jobMasterLeaderId;
 		private final JobMasterGateway jobMasterGateway;
 
-		private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
+		private TaskManagerActionsImpl(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway) {
+			this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
 			this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		}
 
@@ -819,7 +836,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
+					unregisterTaskAndNotifyFinalState(jobMasterLeaderId, jobMasterGateway, executionAttemptID);
 				}
 			});
 		}
@@ -842,7 +859,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 		@Override
 		public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) {
-			TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, taskExecutionState);
+			TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, taskExecutionState);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6486067/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 4850d63..3b9da48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -31,7 +31,10 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 public class RpcInputSplitProvider implements InputSplitProvider {
+	private final UUID jobMasterLeaderId;
 	private final JobMasterGateway jobMasterGateway;
 	private final JobID jobID;
 	private final JobVertexID jobVertexID;
@@ -39,11 +42,13 @@ public class RpcInputSplitProvider implements InputSplitProvider {
 	private final Time timeout;
 
 	public RpcInputSplitProvider(
+			UUID jobMasterLeaderId,
 			JobMasterGateway jobMasterGateway,
 			JobID jobID,
 			JobVertexID jobVertexID,
 			ExecutionAttemptID executionAttemptID,
 			Time timeout) {
+		this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.jobID = Preconditions.checkNotNull(jobID);
 		this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
@@ -56,7 +61,8 @@ public class RpcInputSplitProvider implements InputSplitProvider {
 	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
 		Preconditions.checkNotNull(userCodeClassLoader);
 
-		Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID);
+		Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
+				jobMasterLeaderId, jobVertexID, executionAttemptID);
 
 		try {
 			SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());