You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/13 15:40:12 UTC

[2/8] flink git commit: [FLINK-4375] [distributed coordination] Implement new JobManager creation, initialization, and basic RPC methods

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3b8fc97..d9ff88f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,38 +18,31 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -59,18 +52,13 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -83,9 +71,12 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -93,15 +84,13 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.duration.FiniteDuration;
-
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -112,16 +101,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * It offers the following methods as part of its rpc interface to interact with the JobMaster
  * remotely:
  * <ul>
- * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
+ * <li>{@link #updateTaskExecutionState} updates the task execution state for
  * given task</li>
  * </ul>
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
+	private static final AtomicReferenceFieldUpdater<JobMaster, UUID> LEADER_ID_UPDATER =
+			AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, UUID.class, "leaderSessionID");
+
+	// ------------------------------------------------------------------------
+
 	/** Logical representation of the job */
 	private final JobGraph jobGraph;
 
-	/** Configuration of the job */
+	/** Configuration of the JobManager */
 	private final Configuration configuration;
 
 	/** Service to contend for and retrieve the leadership of JM and RM */
@@ -130,37 +124,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Blob cache manager used across jobs */
 	private final BlobLibraryCacheManager libraryCacheManager;
 
-	/** Factory to create restart strategy for this job */
-	private final RestartStrategyFactory restartStrategyFactory;
-
-	/** Store for save points */
-	private final SavepointStore savepointStore;
-
-	/** The timeout for this job */
-	private final Time timeout;
-
-	/** The scheduler to use for scheduling new tasks as they are needed */
-	private final Scheduler scheduler;
+	/** The metrics for the JobManager itself */
+	private final MetricGroup jobManagerMetricGroup;
 
-	/** The metrics group used across jobs */
-	private final JobManagerMetricGroup jobManagerMetricGroup;
+	/** The metrics for the job */
+	private final MetricGroup jobMetricGroup;
 
 	/** The execution context which is used to execute futures */
-	private final Executor executionContext;
+	private final ExecutorService executionContext;
 
 	private final OnCompletionActions jobCompletionActions;
 
-	/** The execution graph of this job */
-	private volatile ExecutionGraph executionGraph;
-
-	/** The checkpoint recovery factory used by this job */
-	private CheckpointRecoveryFactory checkpointRecoveryFactory;
+	private final FatalErrorHandler errorHandler;
 
-	private ClassLoader userCodeLoader;
+	private final ClassLoader userCodeLoader;
 
-	private RestartStrategy restartStrategy;
+	/** The execution graph of this job */
+	private final ExecutionGraph executionGraph;
 
-	private MetricGroup jobMetrics;
 
 	private volatile UUID leaderSessionID;
 
@@ -170,22 +151,27 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
 
 	/** Connection with ResourceManager, null if not located address yet or we close it initiative */
-	private volatile ResourceManagerConnection resourceManagerConnection;
+	private ResourceManagerConnection resourceManagerConnection;
+
+	// TODO - we need to replace this with the slot pool
+	private final Scheduler scheduler;
 
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
-		JobGraph jobGraph,
-		Configuration configuration,
-		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityService,
-		BlobLibraryCacheManager libraryCacheManager,
-		RestartStrategyFactory restartStrategyFactory,
-		SavepointStore savepointStore,
-		Time timeout,
-		Scheduler scheduler,
-		JobManagerMetricGroup jobManagerMetricGroup,
-		OnCompletionActions jobCompletionActions)
+			JobGraph jobGraph,
+			Configuration configuration,
+			RpcService rpcService,
+			HighAvailabilityServices highAvailabilityService,
+			ExecutorService executorService,
+			BlobLibraryCacheManager libraryCacheManager,
+			RestartStrategyFactory restartStrategyFactory,
+			SavepointStore savepointStore,
+			Time rpcAskTimeout,
+			@Nullable JobManagerMetricGroup jobManagerMetricGroup,
+			OnCompletionActions jobCompletionActions,
+			FatalErrorHandler errorHandler,
+			ClassLoader userCodeLoader) throws Exception
 	{
 		super(rpcService);
 
@@ -193,289 +179,150 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		this.configuration = checkNotNull(configuration);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
 		this.libraryCacheManager = checkNotNull(libraryCacheManager);
-		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
-		this.savepointStore = checkNotNull(savepointStore);
-		this.timeout = checkNotNull(timeout);
-		this.scheduler = checkNotNull(scheduler);
-		this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
-		this.executionContext = checkNotNull(rpcService.getExecutor());
+		this.executionContext = checkNotNull(executorService);
 		this.jobCompletionActions = checkNotNull(jobCompletionActions);
-	}
+		this.errorHandler = checkNotNull(errorHandler);
+		this.userCodeLoader = checkNotNull(userCodeLoader);
 
-	//----------------------------------------------------------------------------------------------
-	// Lifecycle management
-	//----------------------------------------------------------------------------------------------
-
-	/**
-	 * Initializing the job execution environment, should be called before start. Any error occurred during
-	 * initialization will be treated as job submission failure.
-	 *
-	 * @throws JobSubmissionException
-	 */
-	public void init() throws JobSubmissionException {
-		log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
-
-		try {
-			// IMPORTANT: We need to make sure that the library registration is the first action,
-			// because this makes sure that the uploaded jar files are removed in case of
-			// unsuccessful
-			try {
-				libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(),
-					jobGraph.getClasspaths());
-			} catch (Throwable t) {
-				throw new JobSubmissionException(jobGraph.getJobID(),
-					"Cannot set up the user code libraries: " + t.getMessage(), t);
-			}
+		final String jobName = jobGraph.getName();
+		final JobID jid = jobGraph.getJobID();
 
-			userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
-			if (userCodeLoader == null) {
-				throw new JobSubmissionException(jobGraph.getJobID(),
-					"The user code class loader could not be initialized.");
-			}
+		if (jobManagerMetricGroup != null) {
+			this.jobManagerMetricGroup = jobManagerMetricGroup;
+			this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
+		} else {
+			this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
+			this.jobMetricGroup = new UnregisteredMetricsGroup();
+		}
 
-			if (jobGraph.getNumberOfVertices() == 0) {
-				throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
-			}
+		log.info("Initializing job {} ({}).", jobName, jid);
 
-			final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+		final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
 				jobGraph.getSerializedExecutionConfig()
-					.deserializeValue(userCodeLoader)
-					.getRestartStrategy();
-			if (restartStrategyConfiguration != null) {
-				restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
-			}
-			else {
-				restartStrategy = restartStrategyFactory.createRestartStrategy();
-			}
+						.deserializeValue(userCodeLoader)
+						.getRestartStrategy();
 
-			log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+		final RestartStrategy restartStrategy = (restartStrategyConfiguration != null) ?
+				RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
+				restartStrategyFactory.createRestartStrategy();
 
-			if (jobManagerMetricGroup != null) {
-				jobMetrics = jobManagerMetricGroup.addJob(jobGraph);
-			}
-			if (jobMetrics == null) {
-				jobMetrics = new UnregisteredMetricsGroup();
-			}
+		log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
 
-			try {
-				checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
-			} catch (Exception e) {
-				log.error("Could not get the checkpoint recovery factory.", e);
-				throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e);
-			}
+		CheckpointRecoveryFactory checkpointRecoveryFactory;
+		try {
+			checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
+		} catch (Exception e) {
+			log.error("Could not create the access to highly-available checkpoint storage.", e);
+			throw new Exception("Could not create the access to highly-available checkpoint storage.", e);
+		}
 
-			try {
-				resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
-			} catch (Exception e) {
-				log.error("Could not get the resource manager leader retriever.", e);
-				throw new JobSubmissionException(jobGraph.getJobID(),
+		try {
+			resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
+		} catch (Exception e) {
+			log.error("Could not get the resource manager leader retriever.", e);
+			throw new JobSubmissionException(jobGraph.getJobID(),
 					"Could not get the resource manager leader retriever.", e);
-			}
-		} catch (Throwable t) {
-			log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
+		}
 
-			libraryCacheManager.unregisterJob(jobGraph.getJobID());
+		this.executionGraph = ExecutionGraphBuilder.buildGraph(
+				null,
+				jobGraph,
+				configuration,
+				executorService,
+				userCodeLoader,
+				checkpointRecoveryFactory,
+				savepointStore,
+				rpcAskTimeout,
+				restartStrategy,
+				jobMetricGroup,
+				-1,
+				log);
 
-			if (t instanceof JobSubmissionException) {
-				throw (JobSubmissionException) t;
-			}
-			else {
-				throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
-					jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t);
-			}
-		}
+		// TODO - temp fix
+		this.scheduler = new Scheduler(executorService);
 	}
 
+	//----------------------------------------------------------------------------------------------
+	// Lifecycle management
+	//----------------------------------------------------------------------------------------------
+
+
 	@Override
 	public void start() {
-		super.start();
+		throw new UnsupportedOperationException("Should never call start() without leader ID");
 	}
 
+	/**
+	 * Start the rpc service and begin to run the job.
+	 *
+	 * @param leaderSessionID The necessary leader id for running the job.
+	 */
+	public void start(final UUID leaderSessionID) {
+		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
+			super.start();
+
+			log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
+			getSelf().startJobExecution();
+		} else {
+			log.warn("Job already started with leaderId {}, ignoring this start request.", leaderSessionID);
+		}
+	}
+
+	/**
+	 * Suspend the job and shutdown all other services including rpc.
+	 */
 	@Override
 	public void shutDown() {
+		// make sure there is a graceful exit
+		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
-
-		suspendJob(new Exception("JobManager is shutting down."));
-
-		disposeCommunicationWithResourceManager();
 	}
 
-
-
 	//----------------------------------------------------------------------------------------------
 	// RPC methods
 	//----------------------------------------------------------------------------------------------
 
-	/**
-	 * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint
-	 * being recovered. After this, we will begin to schedule the job.
-	 */
+	//-- job starting and stopping  -----------------------------------------------------------------
+
 	@RpcMethod
-	public void startJob(final UUID leaderSessionID) {
-		log.info("Starting job {} ({}) with leaderId {}.", jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
-
-		this.leaderSessionID = leaderSessionID;
-
-		if (executionGraph != null) {
-			executionGraph = new ExecutionGraph(
-				ExecutionContext$.MODULE$.fromExecutor(executionContext),
-				jobGraph.getJobID(),
-				jobGraph.getName(),
-				jobGraph.getJobConfiguration(),
-				jobGraph.getSerializedExecutionConfig(),
-				new FiniteDuration(timeout.getSize(), timeout.getUnit()),
-				restartStrategy,
-				jobGraph.getUserJarBlobKeys(),
-				jobGraph.getClasspaths(),
-				userCodeLoader,
-				jobMetrics);
-		}
-		else {
-			// TODO: update last active time in JobInfo
-		}
+	public void startJobExecution() {
+		log.info("Starting execution of job {} ({}) with leaderId {}.",
+				jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
 
 		try {
-			executionGraph.setScheduleMode(jobGraph.getScheduleMode());
-			executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
-
-			try {
-				executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
-			} catch (Exception e) {
-				log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e);
-				executionGraph.setJsonPlan("{}");
-			}
-
-			// initialize the vertices that have a master initialization hook
-			// file output formats create directories here, input formats create splits
-			if (log.isDebugEnabled()) {
-				log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
-			}
-			for (JobVertex vertex : jobGraph.getVertices()) {
-				final String executableClass = vertex.getInvokableClassName();
-				if (executableClass == null || executableClass.length() == 0) {
-					throw new JobExecutionException(jobGraph.getJobID(),
-						"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
-				}
-				if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-					vertex.setParallelism(scheduler.getTotalNumberOfSlots());
-				}
-
-				try {
-					vertex.initializeOnMaster(userCodeLoader);
-				} catch (Throwable t) {
-					throw new JobExecutionException(jobGraph.getJobID(),
-						"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
-				}
-			}
-
-			// topologically sort the job vertices and attach the graph to the existing one
-			final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
-			if (log.isDebugEnabled()) {
-				log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(),
-					jobGraph.getJobID(), jobGraph.getName());
-			}
-			executionGraph.attachJobGraph(sortedTopology);
-
-			if (log.isDebugEnabled()) {
-				log.debug("Successfully created execution graph from job graph {} ({}).",
-					jobGraph.getJobID(), jobGraph.getName());
-			}
-
-			final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
-			if (snapshotSettings != null) {
-				List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId(
-					executionGraph, snapshotSettings.getVerticesToTrigger());
-
-				List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId(
-					executionGraph, snapshotSettings.getVerticesToAcknowledge());
-
-				List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId(
-					executionGraph, snapshotSettings.getVerticesToConfirm());
-
-				CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore(
-					jobGraph.getJobID(), userCodeLoader);
-
-				CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(
-					jobGraph.getJobID());
-
-				// Checkpoint stats tracker
-				boolean isStatsDisabled = configuration.getBoolean(
-					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
-				final CheckpointStatsTracker checkpointStatsTracker;
-				if (isStatsDisabled) {
-					checkpointStatsTracker = new DisabledCheckpointStatsTracker();
-				}
-				else {
-					int historySize = configuration.getInteger(
-						ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-						ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-					checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
-				}
-
-				executionGraph.enableSnapshotCheckpointing(
-					snapshotSettings.getCheckpointInterval(),
-					snapshotSettings.getCheckpointTimeout(),
-					snapshotSettings.getMinPauseBetweenCheckpoints(),
-					snapshotSettings.getMaxConcurrentCheckpoints(),
-					triggerVertices,
-					ackVertices,
-					confirmVertices,
-					checkpointIdCounter,
-					completedCheckpoints,
-					savepointStore,
-					checkpointStatsTracker);
-			}
-
-			// TODO: register this class to execution graph as job status change listeners
-
-			// TODO: register client as job / execution status change listeners if they are interested
-
-			/*
-			TODO: decide whether we should take the savepoint before recovery
-
-			if (isRecovery) {
-				// this is a recovery of a master failure (this master takes over)
-				executionGraph.restoreLatestCheckpointedState();
-			} else {
-				if (snapshotSettings != null) {
-					String savepointPath = snapshotSettings.getSavepointPath();
-					if (savepointPath != null) {
-						// got a savepoint
-						log.info("Starting job from savepoint {}.", savepointPath);
-
-						// load the savepoint as a checkpoint into the system
-						final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
-							jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath);
-						executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
-
-						// Reset the checkpoint ID counter
-						long nextCheckpointId = savepoint.getCheckpointID() + 1;
-						log.info("Reset the checkpoint ID to " + nextCheckpointId);
-						executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
-
-						executionGraph.restoreLatestCheckpointedState();
-					}
+			// register self as job status change listener
+			executionGraph.registerJobStatusListener(new JobStatusListener() {
+				@Override
+				public void jobStatusChanges(
+						final JobID jobId, final JobStatus newJobStatus, final long timestamp, final Throwable error)
+				{
+					// run in rpc thread to avoid concurrency
+					runAsync(new Runnable() {
+						@Override
+						public void run() {
+							jobStatusChanged(newJobStatus, timestamp, error);
+						}
+					});
 				}
-			}
-			*/
+			});
 
-			// job is good to go, try to locate resource manager's address
+			// job is ready to go, try to establish connection with resource manager
 			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 		} catch (Throwable t) {
+
+			// TODO - this should not result in a job failure, but another leader should take over
+			// TODO - either this master should retry the execution, or it should relinquish leadership / terminate
+
 			log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
 
 			executionGraph.fail(t);
-			executionGraph = null;
 
-			final Throwable rt;
+			final JobExecutionException rt;
 			if (t instanceof JobExecutionException) {
 				rt = (JobExecutionException) t;
-			}
-			else {
+			} else {
 				rt = new JobExecutionException(jobGraph.getJobID(),
-					"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
+						"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
 			}
 
 			// TODO: notify client about this failure
@@ -488,34 +335,51 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		executionContext.execute(new Runnable() {
 			@Override
 			public void run() {
-				if (executionGraph != null) {
-					try {
-						executionGraph.scheduleForExecution(scheduler);
-					} catch (Throwable t) {
-						executionGraph.fail(t);
-					}
+				try {
+					executionGraph.scheduleForExecution(scheduler);
+				} catch (Throwable t) {
+					executionGraph.fail(t);
 				}
 			}
 		});
 	}
 
 	/**
-	 * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared.
+	 * Suspending job, all the running tasks will be cancelled, and communication with other components
+	 * will be disposed.
+	 *
+	 * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
+	 * calling the {@link #start(UUID)} method once we take the leadership back again.
 	 *
 	 * @param cause The reason of why this job been suspended.
 	 */
 	@RpcMethod
-	public void suspendJob(final Throwable cause) {
+	public void suspendExecution(final Throwable cause) {
+		if (leaderSessionID == null) {
+			log.debug("Job has already been suspended or shutdown.");
+			return;
+		}
+
+		// receive no more messages until started again, should be called before we clear self leader id
+		((StartStoppable) getSelf()).stop();
+
 		leaderSessionID = null;
+		executionGraph.suspend(cause);
 
-		if (executionGraph != null) {
-			executionGraph.suspend(cause);
-			executionGraph = null;
+		// disconnect from resource manager:
+		try {
+			resourceManagerLeaderRetriever.stop();
+		} catch (Exception e) {
+			log.warn("Failed to stop resource manager leader retriever when suspending.");
 		}
+		closeResourceManagerConnection();
+
+		// TODO: disconnect from all registered task managers
 
-		disposeCommunicationWithResourceManager();
 	}
 
+	//----------------------------------------------------------------------------------------------
+
 	/**
 	 * Updates the task execution state for a given task.
 	 *
@@ -523,24 +387,38 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * @return Acknowledge the task execution state update
 	 */
 	@RpcMethod
-	public Acknowledge updateTaskExecutionState(final TaskExecutionState taskExecutionState) throws ExecutionGraphException {
+	public Acknowledge updateTaskExecutionState(
+			final UUID leaderSessionID,
+			final TaskExecutionState taskExecutionState) throws Exception
+	{
 		if (taskExecutionState == null) {
 			throw new NullPointerException("TaskExecutionState must not be null.");
 		}
 
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
+
 		if (executionGraph.updateState(taskExecutionState)) {
 			return Acknowledge.get();
 		} else {
 			throw new ExecutionGraphException("The execution attempt " +
-				taskExecutionState.getID() + " was not found.");
+					taskExecutionState.getID() + " was not found.");
 		}
 	}
 
 	@RpcMethod
 	public SerializedInputSplit requestNextInputSplit(
-		final JobVertexID vertexID,
-		final ExecutionAttemptID executionAttempt) throws Exception
+			final UUID leaderSessionID,
+			final JobVertexID vertexID,
+			final ExecutionAttemptID executionAttempt) throws Exception
 	{
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
+
 		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
 		if (execution == null) {
 			// can happen when JobManager had already unregistered this execution upon on task failure,
@@ -579,7 +457,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		} catch (Exception ex) {
 			log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex);
 			IOException reason = new IOException("Could not serialize the next input split of class " +
-				nextInputSplit.getClass() + ".", ex);
+					nextInputSplit.getClass() + ".", ex);
 			vertex.fail(reason);
 			throw reason;
 		}
@@ -587,17 +465,31 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public PartitionState requestPartitionState(
-		final ResultPartitionID partitionId,
-		final ExecutionAttemptID taskExecutionId,
-		final IntermediateDataSetID taskResultId)
+			final UUID leaderSessionID,
+			final ResultPartitionID partitionId,
+			final ExecutionAttemptID taskExecutionId,
+			final IntermediateDataSetID taskResultId) throws Exception
 	{
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
+
 		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
 		final ExecutionState state = execution != null ? execution.getState() : null;
 		return new PartitionState(taskResultId, partitionId.getPartitionId(), state);
 	}
 
 	@RpcMethod
-	public Acknowledge scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
+	public Acknowledge scheduleOrUpdateConsumers(
+			final UUID leaderSessionID,
+			final ResultPartitionID partitionID) throws Exception
+	{
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
+
 		executionGraph.scheduleOrUpdateConsumers(partitionID);
 		return Acknowledge.get();
 	}
@@ -609,223 +501,153 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void acknowledgeCheckpoint(
-		JobID jobID,
-		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		CheckpointStateHandles checkpointStateHandles,
-		long synchronousDurationMillis,
-		long asynchronousDurationMillis,
-		long bytesBufferedInAlignment,
-		long alignmentDurationNanos) {
-		throw new UnsupportedOperationException();
-	}
-
-	@RpcMethod
-	public void declineCheckpoint(
-		JobID jobID,
-		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		long checkpointTimestamp) {
-		throw new UnsupportedOperationException();
-	}
-
-	@RpcMethod
-	public void resourceRemoved(final ResourceID resourceId, final String message) {
-		// TODO: remove resource from slot pool
-	}
-
-	@RpcMethod
-	public void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge) {
-		if (executionGraph != null) {
-			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-			if (checkpointCoordinator != null) {
-				getRpcService().execute(new Runnable() {
-					@Override
-					public void run() {
-						try {
-							if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
-								log.info("Received message for non-existing checkpoint {}.",
+			final JobID jobID,
+			final ExecutionAttemptID executionAttemptID,
+			final long checkpointID,
+			final CheckpointStateHandles checkpointStateHandles,
+			final long synchronousDurationMillis,
+			final long asynchronousDurationMillis,
+			final long bytesBufferedInAlignment,
+			final long alignmentDurationNanos)
+	{
+		final AcknowledgeCheckpoint acknowledge = new AcknowledgeCheckpoint(
+				jobID,
+				executionAttemptID,
+				checkpointID,
+				checkpointStateHandles,
+				synchronousDurationMillis,
+				asynchronousDurationMillis,
+				bytesBufferedInAlignment,
+				alignmentDurationNanos);
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+
+		if (checkpointCoordinator != null) {
+			getRpcService().execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						if (!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
+							log.info("Received message for non-existing checkpoint {}.",
 									acknowledge.getCheckpointId());
-							}
-						} catch (Exception e) {
-							log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e);
 						}
+					} catch (Exception e) {
+						log.error("Error in CheckpointCoordinator while processing {}", acknowledge, e);
 					}
-				});
-			}
-			else {
-				log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
-					jobGraph.getJobID());
-			}
+				}
+			});
 		} else {
-			log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
+			log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
+					jobGraph.getJobID());
 		}
 	}
 
 	@RpcMethod
-	public void declineCheckpoint(final DeclineCheckpoint decline) {
-		if (executionGraph != null) {
-			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-			if (checkpointCoordinator != null) {
-				getRpcService().execute(new Runnable() {
-					@Override
-					public void run() {
-						try {
-							if (!checkpointCoordinator.receiveDeclineMessage(decline)) {
-								log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId());
-							}
-						} catch (Exception e) {
-							log.error("Error in CheckpointCoordinator while processing {}", decline, e);
+	public void declineCheckpoint(
+			final JobID jobID,
+			final ExecutionAttemptID executionAttemptID,
+			final long checkpointID,
+			final long checkpointTimestamp)
+	{
+		final DeclineCheckpoint decline = new DeclineCheckpoint(
+				jobID, executionAttemptID, checkpointID, checkpointTimestamp);
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+
+		if (checkpointCoordinator != null) {
+			getRpcService().execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						if (!checkpointCoordinator.receiveDeclineMessage(decline)) {
+							log.info("Received message for non-existing checkpoint {}.", decline.getCheckpointId());
 						}
+					} catch (Exception e) {
+						log.error("Error in CheckpointCoordinator while processing {}", decline, e);
 					}
-				});
-			} else {
-				log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
-					jobGraph.getJobID());
-			}
+				}
+			});
 		} else {
-			log.error("Received AcknowledgeCheckpoint for unavailable job {}", jobGraph.getJobID());
+			log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator",
+					jobGraph.getJobID());
 		}
 	}
 
 	@RpcMethod
 	public KvStateLocation lookupKvStateLocation(final String registrationName) throws Exception {
-		if (executionGraph != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Lookup key-value state for job {} with registration " +
+		if (log.isDebugEnabled()) {
+			log.debug("Lookup key-value state for job {} with registration " +
 					"name {}.", jobGraph.getJobID(), registrationName);
-			}
+		}
 
-			final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
-			final KvStateLocation location = registry.getKvStateLocation(registrationName);
-			if (location != null) {
-				return location;
-			} else {
-				throw new UnknownKvStateLocation(registrationName);
-			}
+		final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry();
+		final KvStateLocation location = registry.getKvStateLocation(registrationName);
+		if (location != null) {
+			return location;
 		} else {
-			throw new IllegalStateException("Received lookup KvState location request for unavailable job " +
-				jobGraph.getJobID());
+			throw new UnknownKvStateLocation(registrationName);
 		}
 	}
 
 	@RpcMethod
 	public void notifyKvStateRegistered(
-		final JobVertexID jobVertexId,
-		final KeyGroupRange keyGroupRange,
-		final String registrationName,
-		final KvStateID kvStateId,
-		final KvStateServerAddress kvStateServerAddress)
+			final JobVertexID jobVertexId,
+			final KeyGroupRange keyGroupRange,
+			final String registrationName,
+			final KvStateID kvStateId,
+			final KvStateServerAddress kvStateServerAddress)
 	{
-		if (executionGraph != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state registered for job {} under name {}.",
+		if (log.isDebugEnabled()) {
+			log.debug("Key value state registered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
-			}
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress
-				);
-			} catch (Exception e) {
-				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
-			}
-		} else {
-			log.error("Received notify KvState registered request for unavailable job " + jobGraph.getJobID());
+		}
+
+		try {
+			executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+					jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+		} catch (Exception e) {
+			log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
 		}
 	}
 
 	@RpcMethod
 	public void notifyKvStateUnregistered(
-		JobVertexID jobVertexId,
-		KeyGroupRange keyGroupRange,
-		String registrationName)
+			JobVertexID jobVertexId,
+			KeyGroupRange keyGroupRange,
+			String registrationName)
 	{
-		if (executionGraph != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Key value state unregistered for job {} under name {}.",
+		if (log.isDebugEnabled()) {
+			log.debug("Key value state unregistered for job {} under name {}.",
 					jobGraph.getJobID(), registrationName);
-			}
-			try {
-				executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-					jobVertexId, keyGroupRange, registrationName
-				);
-			} catch (Exception e) {
-				log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
-			}
-		} else {
-			log.error("Received notify KvState unregistered request for unavailable job " + jobGraph.getJobID());
 		}
-	}
 
-	@RpcMethod
-	public Future<TriggerSavepointResponse> triggerSavepoint() throws Exception {
-		if (executionGraph != null) {
-			final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
-			if (checkpointCoordinator != null) {
-				try {
-					Future<String> savepointFuture = new FlinkFuture<>(
-						checkpointCoordinator.triggerSavepoint(System.currentTimeMillis()));
-
-					return savepointFuture.handleAsync(new BiFunction<String, Throwable, TriggerSavepointResponse>() {
-						@Override
-						public TriggerSavepointResponse apply(String savepointPath, Throwable throwable) {
-							if (throwable == null) {
-								return new TriggerSavepointResponse.Success(jobGraph.getJobID(), savepointPath);
-							}
-							else {
-								return new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-									new Exception("Failed to complete savepoint", throwable));
-							}
-						}
-					}, getMainThreadExecutor());
-
-				} catch (Exception e) {
-					FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
-					future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-						new Exception("Failed to trigger savepoint", e)));
-					return future;
-				}
-			} else {
-				FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
-				future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-					new IllegalStateException("Checkpointing disabled. You can enable it via the execution " +
-						"environment of your job.")));
-				return future;
-			}
-		} else {
-			FlinkCompletableFuture<TriggerSavepointResponse> future = new FlinkCompletableFuture<>();
-			future.complete(new TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-				new IllegalArgumentException("Received trigger savepoint request for unavailable job " +
-					jobGraph.getJobID())));
-			return future;
+		try {
+			executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+					jobVertexId, keyGroupRange, registrationName);
+		} catch (Exception e) {
+			log.error("Failed to notify KvStateRegistry about registration {}.", registrationName);
 		}
 	}
 
 	@RpcMethod
-	public DisposeSavepointResponse disposeSavepoint(final String savepointPath) {
-		try {
-			log.info("Disposing savepoint at {}.", savepointPath);
-
-			// check whether the savepoint exists
-			savepointStore.loadSavepoint(savepointPath);
+	public Future<String> triggerSavepoint(final UUID leaderSessionID) throws Exception {
+		if (!this.leaderSessionID.equals(leaderSessionID)) {
+			throw new Exception("Leader id not match, expected: " + this.leaderSessionID
+					+ ", actual: " + leaderSessionID);
+		}
 
-			savepointStore.disposeSavepoint(savepointPath);
-			return new DisposeSavepointResponse.Success();
-		} catch (Exception e) {
-			log.error("Failed to dispose savepoint at {}.", savepointPath, e);
-			return new DisposeSavepointResponse.Failure(e);
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		if (checkpointCoordinator != null) {
+			return new FlinkFuture<>(checkpointCoordinator.triggerSavepoint(System.currentTimeMillis()));
+		} else {
+			throw new IllegalStateException("Checkpointing disabled. You can enable it via the execution " +
+					"environment of your job.");
 		}
 	}
 
 	@RpcMethod
 	public ClassloadingProps requestClassloadingProps() throws Exception {
-		if (executionGraph != null) {
-			return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+		return new ClassloadingProps(libraryCacheManager.getBlobServerPort(),
 				executionGraph.getRequiredJarFiles(),
 				executionGraph.getRequiredClasspaths());
-		} else {
-			throw new Exception("Received classloading props request for unavailable job " + jobGraph.getJobID());
-		}
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -838,12 +660,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			public void run() {
 				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
 				shutDown();
-				jobCompletionActions.onFatalError(cause);
+				errorHandler.onFatalError(cause);
 			}
 		});
 	}
 
-	// TODO - wrap this as StatusListenerMessenger's callback with rpc main thread
 	private void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable error) {
 		final JobID jobID = executionGraph.getJobID();
 		final String jobName = executionGraph.getJobName();
@@ -871,36 +692,33 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			if (newJobStatus == JobStatus.FINISHED) {
 				try {
 					final Map<String, SerializedValue<Object>> accumulatorResults =
-						executionGraph.getAccumulatorsSerialized();
+							executionGraph.getAccumulatorsSerialized();
 					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
-						jobID, 0, accumulatorResults // TODO get correct job duration
+							jobID, 0, accumulatorResults // TODO get correct job duration
 					);
 					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
 				} catch (Exception e) {
 					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
 					final JobExecutionException exception = new JobExecutionException(
-						jobID, "Failed to retrieve accumulator results.", e);
+							jobID, "Failed to retrieve accumulator results.", e);
 					// TODO should we also notify client?
 					jobCompletionActions.jobFailed(exception);
 				}
-			}
-			else if (newJobStatus == JobStatus.CANCELED) {
+			} else if (newJobStatus == JobStatus.CANCELED) {
 				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
 				final JobExecutionException exception = new JobExecutionException(
-					jobID, "Job was cancelled.", unpackedError);
+						jobID, "Job was cancelled.", unpackedError);
 				// TODO should we also notify client?
 				jobCompletionActions.jobFailed(exception);
-			}
-			else if (newJobStatus == JobStatus.FAILED) {
+			} else if (newJobStatus == JobStatus.FAILED) {
 				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
 				final JobExecutionException exception = new JobExecutionException(
-					jobID, "Job execution failed.", unpackedError);
+						jobID, "Job execution failed.", unpackedError);
 				// TODO should we also notify client?
 				jobCompletionActions.jobFailed(exception);
-			}
-			else {
+			} else {
 				final JobExecutionException exception = new JobExecutionException(
-					jobID, newJobStatus + " is not a terminal state.");
+						jobID, newJobStatus + " is not a terminal state.");
 				// TODO should we also notify client?
 				jobCompletionActions.jobFailed(exception);
 				throw new RuntimeException(exception);
@@ -909,7 +727,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	private void notifyOfNewResourceManagerLeader(
-		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
+			final String resourceManagerAddress, final UUID resourceManagerLeaderId)
 	{
 		// IMPORTANT: executed by main thread to avoid concurrence
 		runAsync(new Runnable() {
@@ -918,17 +736,15 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				if (resourceManagerConnection != null) {
 					if (resourceManagerAddress != null) {
 						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-							&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
-						{
+								&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
 							// both address and leader id are not changed, we can keep the old connection
 							return;
 						}
 						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-							resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
-					}
-					else {
+								resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+					} else {
 						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-							resourceManagerConnection.getTargetAddress());
+								resourceManagerConnection.getTargetAddress());
 					}
 				}
 
@@ -937,8 +753,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				if (resourceManagerAddress != null) {
 					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
 					resourceManagerConnection = new ResourceManagerConnection(
-						log, jobGraph.getJobID(), leaderSessionID,
-						resourceManagerAddress, resourceManagerLeaderId, executionContext);
+							log, jobGraph.getJobID(), leaderSessionID,
+							resourceManagerAddress, resourceManagerLeaderId, executionContext);
 					resourceManagerConnection.start();
 				}
 			}
@@ -952,26 +768,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
 				// verify the response with current connection
 				if (resourceManagerConnection != null
-					&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
+						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) {
 					log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
-						success.getResourceManagerLeaderId());
+							success.getResourceManagerLeaderId());
 				}
 			}
 		});
 	}
 
-	private void disposeCommunicationWithResourceManager() {
-		// 1. stop the leader retriever so we will not receiving updates anymore
-		try {
-			resourceManagerLeaderRetriever.stop();
-		} catch (Exception e) {
-			log.warn("Failed to stop resource manager leader retriever.");
-		}
-
-		// 2. close current connection with ResourceManager if exists
-		closeResourceManagerConnection();
-	}
-
 	private void closeResourceManagerConnection() {
 		if (resourceManagerConnection != null) {
 			resourceManagerConnection.close();
@@ -980,34 +784,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	//----------------------------------------------------------------------------------------------
-	// Helper methods
-	//----------------------------------------------------------------------------------------------
-
-	/**
-	 * Converts JobVertexIDs to corresponding ExecutionJobVertexes
-	 *
-	 * @param executionGraph The execution graph that holds the relationship
-	 * @param vertexIDs      The vertexIDs need to be converted
-	 * @return The corresponding ExecutionJobVertexes
-	 * @throws JobExecutionException
-	 */
-	private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
-		final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs)
-		throws JobExecutionException
-	{
-		final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size());
-		for (JobVertexID vertexID : vertexIDs) {
-			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID);
-			if (executionJobVertex == null) {
-				throw new JobExecutionException(executionGraph.getJobID(),
-					"The snapshot checkpointing settings refer to non-existent vertex " + vertexID);
-			}
-			ret.add(executionJobVertex);
-		}
-		return ret;
-	}
-
-	//----------------------------------------------------------------------------------------------
 	// Utility classes
 	//----------------------------------------------------------------------------------------------
 
@@ -1024,19 +800,19 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	private class ResourceManagerConnection
-		extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
+			extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess>
 	{
 		private final JobID jobID;
 
 		private final UUID jobManagerLeaderID;
 
 		ResourceManagerConnection(
-			final Logger log,
-			final JobID jobID,
-			final UUID jobManagerLeaderID,
-			final String resourceManagerAddress,
-			final UUID resourceManagerLeaderID,
-			final Executor executor)
+				final Logger log,
+				final JobID jobID,
+				final UUID jobManagerLeaderID,
+				final String resourceManagerAddress,
+				final UUID resourceManagerLeaderID,
+				final Executor executor)
 		{
 			super(log, resourceManagerAddress, resourceManagerLeaderID, executor);
 			this.jobID = checkNotNull(jobID);
@@ -1046,12 +822,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		@Override
 		protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
 			return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>(
-				log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
-				getTargetAddress(), getTargetLeaderId())
+					log, getRpcService(), "ResourceManager", ResourceManagerGateway.class,
+					getTargetAddress(), getTargetLeaderId())
 			{
 				@Override
 				protected Future<RegistrationResponse> invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
-					long timeoutMillis) throws Exception
+						long timeoutMillis) throws Exception
 				{
 					Time timeout = Time.milliseconds(timeoutMillis);
 					return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4b51258..fbe3f74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -29,15 +31,11 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -49,52 +47,56 @@ import java.util.UUID;
  */
 public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 
-	/**
-	 * Starting the job under the given leader session ID.
-	 */
-	void startJob(final UUID leaderSessionID);
+	// ------------------------------------------------------------------------
+	//  Job start and stop methods
+	// ------------------------------------------------------------------------
 
-	/**
-	 * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared.
-	 * Should re-submit the job before restarting it.
-	 *
-	 * @param cause The reason of why this job been suspended.
-	 */
-	void suspendJob(final Throwable cause);
+	void startJobExecution();
+
+	void suspendExecution(Throwable cause);
+
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Updates the task execution state for a given task.
 	 *
+	 * @param leaderSessionID    The leader id of JobManager
 	 * @param taskExecutionState New task execution state for a given task
 	 * @return Future flag of the task execution state update result
 	 */
-	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+	Future<Acknowledge> updateTaskExecutionState(
+			final UUID leaderSessionID,
+			final TaskExecutionState taskExecutionState);
 
 	/**
 	 * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
 	 * as a {@link SerializedInputSplit} message.
 	 *
+	 * @param leaderSessionID  The leader id of JobManager
 	 * @param vertexID         The job vertex id
 	 * @param executionAttempt The execution attempt id
 	 * @return The future of the input split. If there is no further input split, will return an empty object.
 	 */
 	Future<SerializedInputSplit> requestNextInputSplit(
-		final JobVertexID vertexID,
-		final ExecutionAttemptID executionAttempt);
+			final UUID leaderSessionID,
+			final JobVertexID vertexID,
+			final ExecutionAttemptID executionAttempt);
 
 	/**
 	 * Requests the current state of the partition.
 	 * The state of a partition is currently bound to the state of the producing execution.
 	 *
+	 * @param leaderSessionID The leader id of JobManager
 	 * @param partitionId     The partition ID of the partition to request the state of.
 	 * @param taskExecutionId The execution attempt ID of the task requesting the partition state.
 	 * @param taskResultId    The input gate ID of the task requesting the partition state.
 	 * @return The future of the partition state
 	 */
 	Future<PartitionState> requestPartitionState(
-		final ResultPartitionID partitionId,
-		final ExecutionAttemptID taskExecutionId,
-		final IntermediateDataSetID taskResultId);
+			final UUID leaderSessionID,
+			final ResultPartitionID partitionId,
+			final ExecutionAttemptID taskExecutionId,
+			final IntermediateDataSetID taskResultId);
 
 	/**
 	 * Notifies the JobManager about available data for a produced partition.
@@ -105,11 +107,15 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * <p>
 	 * The JobManager then can decide when to schedule the partition consumers of the given session.
 	 *
-	 * @param partitionID The partition which has already produced data
-	 * @param timeout before the rpc call fails
+	 * @param leaderSessionID The leader id of JobManager
+	 * @param partitionID     The partition which has already produced data
+	 * @param timeout         before the rpc call fails
 	 * @return Future acknowledge of the schedule or update operation
 	 */
-	Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID partitionID, @RpcTimeout final Time timeout);
+	Future<Acknowledge> scheduleOrUpdateConsumers(
+			final UUID leaderSessionID,
+			final ResultPartitionID partitionID,
+			@RpcTimeout final Time timeout);
 
 	/**
 	 * Disconnects the given {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
@@ -118,31 +124,6 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param resourceID identifying the TaskManager to disconnect
 	 */
 	void disconnectTaskManager(ResourceID resourceID);
-	void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
-
-	/**
-	 * Notifies the JobManager about the removal of a resource.
-	 *
-	 * @param resourceId The ID under which the resource is registered.
-	 * @param message    Optional message with details, for logging and debugging.
-	 */
-
-	void resourceRemoved(final ResourceID resourceId, final String message);
-
-	/**
-	 * Notifies the JobManager that the checkpoint of an individual task is completed.
-	 *
-	 * @param acknowledge The acknowledge message of the checkpoint
-	 */
-	void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
-
-	/**
-	 * Notifies the JobManager that a checkpoint request could not be heeded.
-	 * This can happen if a Task is already in RUNNING state but is internally not yet ready to perform checkpoints.
-	 *
-	 * @param decline The decline message of the checkpoint
-	 */
-	void declineCheckpoint(final DeclineCheckpoint decline);
 
 	/**
 	 * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
@@ -150,7 +131,7 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param registrationName Name under which the KvState has been registered.
 	 * @return Future of the requested {@link KvState} location
 	 */
-	Future<KvStateLocation> lookupKvStateLocation(final String registrationName) throws Exception;
+	Future<KvStateLocation> lookupKvStateLocation(final String registrationName);
 
 	/**
 	 * @param jobVertexId          JobVertexID the KvState instance belongs to.
@@ -160,11 +141,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param kvStateServerAddress Server address where to find the KvState instance.
 	 */
 	void notifyKvStateRegistered(
-		final JobVertexID jobVertexId,
-		final KeyGroupRange keyGroupRange,
-		final String registrationName,
-		final KvStateID kvStateId,
-		final KvStateServerAddress kvStateServerAddress);
+			final JobVertexID jobVertexId,
+			final KeyGroupRange keyGroupRange,
+			final String registrationName,
+			final KvStateID kvStateId,
+			final KvStateServerAddress kvStateServerAddress);
 
 	/**
 	 * @param jobVertexId      JobVertexID the KvState instance belongs to.
@@ -172,24 +153,17 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * @param registrationName Name under which the KvState has been registered.
 	 */
 	void notifyKvStateUnregistered(
-		JobVertexID jobVertexId,
-		KeyGroupRange keyGroupRange,
-		String registrationName);
+			JobVertexID jobVertexId,
+			KeyGroupRange keyGroupRange,
+			String registrationName);
 
 	/**
 	 * Notifies the JobManager to trigger a savepoint for this job.
 	 *
-	 * @return Future of the savepoint trigger response.
-	 */
-	Future<TriggerSavepointResponse> triggerSavepoint();
-
-	/**
-	 * Notifies the Jobmanager to dispose specified savepoint.
-	 *
-	 * @param savepointPath The path of the savepoint.
-	 * @return The future of the savepoint disponse response.
+	 * @param leaderSessionID The leader id of JobManager
+	 * @return The savepoint path
 	 */
-	Future<DisposeSavepointResponse> disposeSavepoint(final String savepointPath);
+	Future<String> triggerSavepoint(final UUID leaderSessionID);
 
 	/**
 	 * Request the classloading props of this job.

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
index e8fb5bb..019ccfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
@@ -62,6 +64,9 @@ public class MiniClusterJobDispatcher {
 	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
 	private final JobManagerServices jobManagerServices;
 
+	/** Registry for all metrics in the mini cluster */
+	private final MetricRegistry metricRegistry;
+
 	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
 	private final int numJobManagers;
 
@@ -86,8 +91,9 @@ public class MiniClusterJobDispatcher {
 	public MiniClusterJobDispatcher(
 			Configuration config,
 			RpcService rpcService,
-			HighAvailabilityServices haServices) throws Exception {
-		this(config, rpcService, haServices, 1);
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry) throws Exception {
+		this(config, rpcService, haServices, metricRegistry, 1);
 	}
 
 	/**
@@ -106,16 +112,18 @@ public class MiniClusterJobDispatcher {
 			Configuration config,
 			RpcService rpcService,
 			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry,
 			int numJobManagers) throws Exception {
 
 		checkArgument(numJobManagers >= 1);
 		this.configuration = checkNotNull(config);
 		this.rpcService = checkNotNull(rpcService);
 		this.haServices = checkNotNull(haServices);
+		this.metricRegistry = checkNotNull(metricRegistry);
 		this.numJobManagers = numJobManagers;
 
 		LOG.info("Creating JobMaster services");
-		this.jobManagerServices = JobManagerServices.fromConfiguration(config);
+		this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
 	}
 
 	// ------------------------------------------------------------------------
@@ -140,9 +148,8 @@ public class MiniClusterJobDispatcher {
 				if (runners != null) {
 					this.runners = null;
 
-					Exception shutdownException = new Exception("The MiniCluster is shutting down");
 					for (JobManagerRunner runner : runners) {
-						runner.shutdown(shutdownException);
+						runner.shutdown();
 					}
 				}
 			}
@@ -171,9 +178,9 @@ public class MiniClusterJobDispatcher {
 			checkState(!shutdown, "mini cluster is shut down");
 			checkState(runners == null, "mini cluster can only execute one job at a time");
 
-			OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers);
+			DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
 
-			this.runners = startJobRunners(job, onJobCompletion);
+			this.runners = startJobRunners(job, finalizer, finalizer);
 		}
 	}
 
@@ -191,17 +198,17 @@ public class MiniClusterJobDispatcher {
 		checkNotNull(job);
 		
 		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
-		final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers);
+		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
 
 		synchronized (lock) {
 			checkState(!shutdown, "mini cluster is shut down");
 			checkState(runners == null, "mini cluster can only execute one job at a time");
 
-			this.runners = startJobRunners(job, onJobCompletion);
+			this.runners = startJobRunners(job, sync, sync);
 		}
 
 		try {
-			return onJobCompletion.getResult();
+			return sync.getResult();
 		}
 		finally {
 			// always clear the status for the next job
@@ -209,24 +216,26 @@ public class MiniClusterJobDispatcher {
 		}
 	}
 
-	private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException {
+	private JobManagerRunner[] startJobRunners(
+			JobGraph job,
+			OnCompletionActions onCompletion,
+			FatalErrorHandler errorHandler) throws JobExecutionException {
 		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
 
 		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
 		for (int i = 0; i < numJobManagers; i++) {
 			try {
 				runners[i] = new JobManagerRunner(job, configuration,
-						rpcService, haServices, jobManagerServices, onCompletion);
+						rpcService, haServices, jobManagerServices, metricRegistry, 
+						onCompletion, errorHandler);
 				runners[i].start();
 			}
 			catch (Throwable t) {
 				// shut down all the ones so far
-				Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t);
-
 				for (int k = 0; k <= i; k++) {
 					try {
 						if (runners[i] != null) {
-							runners[i].shutdown(shutdownCause);
+							runners[i].shutdown();
 						}
 					} catch (Throwable ignored) {
 						// silent shutdown
@@ -244,15 +253,15 @@ public class MiniClusterJobDispatcher {
 	//  test methods to simulate job master failures
 	// ------------------------------------------------------------------------
 
-	public void killJobMaster(int which) {
-		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
-		checkState(!shutdown, "mini cluster is shut down");
-
-		JobManagerRunner[] runners = this.runners;
-		checkState(runners != null, "mini cluster it not executing a job right now");
-
-		runners[which].shutdown(new Throwable("kill JobManager"));
-	}
+//	public void killJobMaster(int which) {
+//		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+//		checkState(!shutdown, "mini cluster is shut down");
+//
+//		JobManagerRunner[] runners = this.runners;
+//		checkState(runners != null, "mini cluster it not executing a job right now");
+//
+//		runners[which].shutdown(new Throwable("kill JobManager"));
+//	}
 
 	// ------------------------------------------------------------------------
 	//  utility classes
@@ -263,7 +272,7 @@ public class MiniClusterJobDispatcher {
 	 * In the case of a high-availability test setup, there may be multiple runners.
 	 * After that, it marks the mini cluster as ready to receive new jobs.
 	 */
-	private class DetachedFinalizer implements OnCompletionActions {
+	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
 
 		private final AtomicInteger numJobManagersToWaitFor;
 
@@ -308,7 +317,7 @@ public class MiniClusterJobDispatcher {
 	 * That way it is guaranteed that after the blocking job submit call returns,
 	 * the dispatcher is immediately free to accept another job.
 	 */
-	private static class BlockingJobSync implements OnCompletionActions {
+	private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
 
 		private final JobID jobId;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
deleted file mode 100644
index 42bfc71..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import java.io.Serializable;
-
-/**
- * The response of the dispose savepoint request to JobManager.
- */
-public abstract class DisposeSavepointResponse implements Serializable {
-
-	private static final long serialVersionUID = 6008792963949369567L;
-
-	public static class Success extends DisposeSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = 1572462960008711415L;
-	}
-
-	public static class Failure extends DisposeSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = -7505308325483022458L;
-
-		private final Throwable cause;
-
-		public Failure(final Throwable cause) {
-			this.cause = cause;
-		}
-
-		public Throwable getCause() {
-			return cause;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
deleted file mode 100644
index 0b0edc5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import org.apache.flink.api.common.JobID;
-
-import java.io.Serializable;
-
-/**
- * The response of the trigger savepoint request to JobManager.
- */
-public abstract class TriggerSavepointResponse implements Serializable {
-
-	private static final long serialVersionUID = 3139327824611807707L;
-
-	private final JobID jobID;
-
-	public JobID getJobID() {
-		return jobID;
-	}
-
-	public TriggerSavepointResponse(final JobID jobID) {
-		this.jobID = jobID;
-	}
-
-	public static class Success extends TriggerSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = -1100637460388881776L;
-
-		private final String savepointPath;
-
-		public Success(final JobID jobID, final String savepointPath) {
-			super(jobID);
-			this.savepointPath = savepointPath;
-		}
-
-		public String getSavepointPath() {
-			return savepointPath;
-		}
-	}
-
-	public static class Failure extends TriggerSavepointResponse implements Serializable {
-
-		private static final long serialVersionUID = -1668479003490615139L;
-
-		private final Throwable cause;
-
-		public Failure(final JobID jobID, final Throwable cause) {
-			super(jobID);
-			this.cause = cause;
-		}
-
-		public Throwable getCause() {
-			return cause;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 2052f98..4b9100a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
 public interface RpcService {
 
 	/**
-	 * Return the address under which the rpc service can be reached. If the rpc service cannot be
-	 * contacted remotely, then it will return an empty string.
+	 * Return the hostname or host address under which the rpc service can be reached.
+	 * If the rpc service cannot be contacted remotely, then it will return an empty string.
 	 *
 	 * @return Address of the rpc service or empty string if local rpc service
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index ef62ef1..6fcd082 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -26,11 +26,16 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 /**
  * Container class for JobManager specific communication utils used by the {@link TaskExecutor}.
  */
 public class JobManagerConnection {
 
+	// Job master leader session id
+	private final UUID jobMasterLeaderId;
+
 	// Gateway to the job master
 	private final JobMasterGateway jobMasterGateway;
 
@@ -50,13 +55,15 @@ public class JobManagerConnection {
 	private final PartitionStateChecker partitionStateChecker;
 
 	public JobManagerConnection(
-		JobMasterGateway jobMasterGateway,
-		TaskManagerActions taskManagerActions,
-		CheckpointResponder checkpointResponder,
-		LibraryCacheManager libraryCacheManager,
-		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
-		PartitionStateChecker partitionStateChecker) {
-
+			UUID jobMasterLeaderId,
+			JobMasterGateway jobMasterGateway,
+			TaskManagerActions taskManagerActions,
+			CheckpointResponder checkpointResponder,
+			LibraryCacheManager libraryCacheManager,
+			ResultPartitionConsumableNotifier resultPartitionConsumableNotifier,
+			PartitionStateChecker partitionStateChecker)
+	{
+		this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
@@ -65,6 +72,10 @@ public class JobManagerConnection {
 		this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
 	}
 
+	public UUID getJobMasterLeaderId() {
+		return jobMasterLeaderId;
+	}
+
 	public JobMasterGateway getJobManagerGateway() {
 		return jobMasterGateway;
 	}