You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/02 21:58:43 UTC

[38/50] [abbrv] flink git commit: [FLINK-4408] [JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph

[FLINK-4408] [JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph

This closes #2480


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9816484e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9816484e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9816484e

Branch: refs/heads/flip-6
Commit: 9816484eb5403d0394f78d5430fb2ab8575f916d
Parents: 18f26e0
Author: Kurt Young <yk...@gmail.com>
Authored: Thu Sep 8 12:00:13 2016 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 2 23:44:46 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/JobExecutionResult.java    |   2 +-
 .../flink/api/common/JobSubmissionResult.java   |   2 +-
 .../HighAvailabilityServices.java               |  12 +
 .../runtime/highavailability/NonHaServices.java |  16 +-
 .../runtime/jobmanager/OnCompletionActions.java |  31 ++
 .../runtime/jobmanager/scheduler/Scheduler.java |   9 +
 .../runtime/jobmaster/JobManagerRunner.java     | 288 +++++++++++
 .../runtime/jobmaster/JobManagerServices.java   |  73 +++
 .../flink/runtime/jobmaster/JobMaster.java      | 485 ++++++++++++++-----
 .../runtime/jobmaster/JobMasterGateway.java     |  13 +
 .../jobmaster/MiniClusterJobDispatcher.java     | 385 +++++++++++++++
 .../flink/runtime/rpc/FatalErrorHandler.java    |  24 +
 .../runtime/taskexecutor/TaskExecutor.java      |  12 +
 .../TestingHighAvailabilityServices.java        |  39 +-
 .../jobmaster/JobManagerRunnerMockTest.java     | 254 ++++++++++
 .../flink/runtime/rpc/RpcConnectionTest.java    |  17 +-
 16 files changed, 1533 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index cb4ecc5..7286cc5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
 @Public
 public class JobExecutionResult extends JobSubmissionResult {
 
-	private long netRuntime;
+	private final long netRuntime;
 
 	private final Map<String, Object> accumulatorResults;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
index c5dc869..b0e7e24 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.annotation.Public;
 @Public
 public class JobSubmissionResult {
 
-	private JobID jobID;
+	private final JobID jobID;
 
 	public JobSubmissionResult(JobID jobID) {
 		this.jobID = jobID;

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 7634176..d67e927 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
@@ -61,4 +63,14 @@ public interface HighAvailabilityServices {
 	 * @param jobID The identifier of the job running the election.
 	 */
 	LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception;
+
+	/**
+	 * Gets the checkpoint recovery factory for the job manager
+	 */
+	CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;
+
+	/**
+	 * Gets the submitted job graph store for the job manager
+	 */
+	SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 33dc2d7..a2c9cc4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,13 +19,17 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -88,4 +92,14 @@ public class NonHaServices implements HighAvailabilityServices {
 	public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
 		return new StandaloneLeaderElectionService();
 	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+		return new StandaloneCheckpointRecoveryFactory();
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		return new StandaloneSubmittedJobGraphStore();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
new file mode 100644
index 0000000..6de4253
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+
+public interface OnCompletionActions extends FatalErrorHandler {
+
+	void jobFinished(JobExecutionResult result);
+
+	void jobFailed(Throwable cause);
+
+	void jobFinishedByOther();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b839e0e..aa09314 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -31,6 +31,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import akka.dispatch.Futures;
@@ -57,6 +58,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
@@ -110,6 +112,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 	/**
 	 * Creates a new scheduler.
 	 */
+	public Scheduler(ExecutorService executor) {
+		this(ExecutionContext$.MODULE$.fromExecutor(executor));
+	}
+	
+	/**
+	 * Creates a new scheduler.
+	 */
 	public Scheduler(ExecutionContext executionContext) {
 		this.executionContext = executionContext;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
new file mode 100644
index 0000000..bc2bf9a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+/**
+ * The runner for the job manager. It deals with job level leader election and make underlying job manager
+ * properly reacted.
+ */
+public class JobManagerRunner implements LeaderContender, OnCompletionActions {
+
+	private final Logger log = LoggerFactory.getLogger(JobManagerRunner.class);
+
+	/** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously */
+	private final Object lock = new Object();
+
+	/** The job graph needs to run */
+	private final JobGraph jobGraph;
+
+	private final OnCompletionActions toNotify;
+
+	/** The execution context which is used to execute futures */
+	private final Executor executionContext;
+
+	// TODO: use this to decide whether the job is finished by other
+	private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+	/** Leader election for this job */
+	private final LeaderElectionService leaderElectionService;
+
+	private final JobMaster jobManager;
+
+	/** Leader session id when granted leadership */
+	private UUID leaderSessionID;
+
+	/** flag marking the runner as shut down */
+	private volatile boolean shutdown;
+
+	public JobManagerRunner(
+		final JobGraph jobGraph,
+		final Configuration configuration,
+		final RpcService rpcService,
+		final HighAvailabilityServices haServices,
+		final OnCompletionActions toNotify) throws Exception
+	{
+		this(jobGraph, configuration, rpcService, haServices,
+			JobManagerServices.fromConfiguration(configuration), toNotify);
+	}
+
+	public JobManagerRunner(
+		final JobGraph jobGraph,
+		final Configuration configuration,
+		final RpcService rpcService,
+		final HighAvailabilityServices haServices,
+		final JobManagerServices jobManagerServices,
+		final OnCompletionActions toNotify) throws Exception
+	{
+		this.jobGraph = jobGraph;
+		this.toNotify = toNotify;
+		this.executionContext = rpcService.getExecutor();
+		this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory();
+		this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
+		this.leaderSessionID = null;
+
+		this.jobManager = new JobMaster(
+			jobGraph, configuration, rpcService, haServices,
+			jobManagerServices.libraryCacheManager,
+			jobManagerServices.restartStrategyFactory,
+			jobManagerServices.savepointStore,
+			jobManagerServices.timeout,
+			new Scheduler(jobManagerServices.executorService),
+			jobManagerServices.jobManagerMetricGroup,
+			this);
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Lifecycle management
+	//----------------------------------------------------------------------------------------------
+
+	public void start() throws Exception {
+		jobManager.init();
+		jobManager.start();
+
+		try {
+			leaderElectionService.start(this);
+		}
+		catch (Exception e) {
+			log.error("Could not start the JobManager because the leader election service did not start.", e);
+			throw new Exception("Could not start the leader election service.", e);
+		}
+	}
+
+	public void shutdown() {
+		shutdown(new Exception("The JobManager runner is shutting down"));
+	}
+
+	public void shutdown(Throwable cause) {
+		// TODO what is the cause used for ?
+		shutdownInternally();
+	}
+
+	private void shutdownInternally() {
+		synchronized (lock) {
+			shutdown = true;
+
+			if (leaderElectionService != null) {
+				try {
+					leaderElectionService.stop();
+				} catch (Exception e) {
+					log.error("Could not properly shutdown the leader election service.");
+				}
+			}
+
+			jobManager.shutDown();
+		}
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Result and error handling methods
+	//----------------------------------------------------------------------------------------------
+
+	/**
+	 * Job completion notification triggered by JobManager
+	 */
+	@Override
+	public void jobFinished(JobExecutionResult result) {
+		try {
+			shutdownInternally();
+		}
+		finally {
+			if (toNotify != null) {
+				toNotify.jobFinished(result);
+			}
+		}
+	}
+
+	/**
+	 * Job completion notification triggered by JobManager
+	 */
+	@Override
+	public void jobFailed(Throwable cause) {
+		try {
+			shutdownInternally();
+		}
+		finally {
+			if (toNotify != null) {
+				toNotify.jobFailed(cause);
+			}
+		}
+	}
+
+	/**
+	 * Job completion notification triggered by self
+	 */
+	@Override
+	public void jobFinishedByOther() {
+		try {
+			shutdownInternally();
+		}
+		finally {
+			if (toNotify != null) {
+				toNotify.jobFinishedByOther();
+			}
+		}
+	}
+
+	/**
+	 * Job completion notification triggered by JobManager or self
+	 */
+	@Override
+	public void onFatalError(Throwable exception) {
+		// first and in any case, notify our handler, so it can react fast
+		try {
+			if (toNotify != null) {
+				toNotify.onFatalError(exception);
+			}
+		}
+		finally {
+			log.error("JobManager runner encountered a fatal error.", exception);
+			shutdownInternally();
+		}
+	}
+
+	//----------------------------------------------------------------------------------------------
+	// Leadership methods
+	//----------------------------------------------------------------------------------------------
+
+	@Override
+	public void grantLeadership(final UUID leaderSessionID) {
+		synchronized (lock) {
+			if (shutdown) {
+				log.info("JobManagerRunner already shutdown.");
+				return;
+			}
+
+			log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
+				jobGraph.getName(), jobGraph.getJobID(), leaderSessionID, getAddress());
+
+			// The operation may be blocking, but since this runner is idle before it been granted the leadership,
+			// it's okay that job manager wait for the operation complete
+			leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+			this.leaderSessionID = leaderSessionID;
+
+			// Double check the leadership after we confirm that, there is a small chance that multiple
+			// job managers schedule the same job after if they try to recover at the same time.
+			// This will eventually be noticed, but can not be ruled out from the beginning.
+			if (leaderElectionService.hasLeadership()) {
+				if (isJobFinishedByOthers()) {
+					log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
+					jobFinishedByOther();
+				} else {
+					jobManager.getSelf().startJob();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void revokeLeadership() {
+		synchronized (lock) {
+			if (shutdown) {
+				log.info("JobManagerRunner already shutdown.");
+				return;
+			}
+
+			log.info("JobManager for job {} ({}) was revoked leadership at {}.",
+				jobGraph.getName(), jobGraph.getJobID(), getAddress());
+
+			leaderSessionID = null;
+			jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader."));
+		}
+	}
+
+	@Override
+	public String getAddress() {
+		return jobManager.getAddress();
+	}
+
+	@Override
+	public void handleError(Exception exception) {
+		log.error("Leader Election Service encountered a fatal error.", exception);
+		onFatalError(exception);
+	}
+
+	@VisibleForTesting
+	boolean isJobFinishedByOthers() {
+		// TODO
+		return false;
+	}
+
+	@VisibleForTesting
+	boolean isShutdown() {
+		return shutdown;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
new file mode 100644
index 0000000..e6beba6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class to hold all auxiliary services used by the {@link JobMaster}.
+ */
+public class JobManagerServices {
+
+	public final ExecutorService executorService;
+
+	public final BlobLibraryCacheManager libraryCacheManager;
+
+	public final RestartStrategyFactory restartStrategyFactory;
+
+	public final SavepointStore savepointStore;
+
+	public final Time timeout;
+
+	public final JobManagerMetricGroup jobManagerMetricGroup;
+
+	public JobManagerServices(
+			ExecutorService executorService,
+			BlobLibraryCacheManager libraryCacheManager,
+			RestartStrategyFactory restartStrategyFactory,
+			SavepointStore savepointStore,
+			Time timeout,
+			JobManagerMetricGroup jobManagerMetricGroup) {
+
+		this.executorService = checkNotNull(executorService);
+		this.libraryCacheManager = checkNotNull(libraryCacheManager);
+		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
+		this.savepointStore = checkNotNull(savepointStore);
+		this.timeout = checkNotNull(timeout);
+		this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Creating the components from a configuration 
+	// ------------------------------------------------------------------------
+	
+	public static JobManagerServices fromConfiguration(Configuration config) throws Exception {
+		// TODO not yet implemented
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1537396..b52a23c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,21 +18,50 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.util.Preconditions;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.FiniteDuration;
 
-import java.util.UUID;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution of a single
@@ -41,7 +70,7 @@ import java.util.UUID;
  * It offers the following methods as part of its rpc interface to interact with the JobMaster
  * remotely:
  * <ul>
- *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
+ * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
  * given task</li>
  * </ul>
  */
@@ -52,7 +81,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	/** Logical representation of the job */
 	private final JobGraph jobGraph;
-	private final JobID jobID;
 
 	/** Configuration of the job */
 	private final Configuration configuration;
@@ -60,32 +88,67 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Service to contend for and retrieve the leadership of JM and RM */
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	/** Leader Management */
-	private LeaderElectionService leaderElectionService = null;
-	private UUID leaderSessionID;
+	/** Blob cache manager used across jobs */
+	private final BlobLibraryCacheManager libraryCacheManager;
+
+	/** Factory to create restart strategy for this job */
+	private final RestartStrategyFactory restartStrategyFactory;
+
+	/** Store for save points */
+	private final SavepointStore savepointStore;
+
+	/** The timeout for this job */
+	private final Time timeout;
+
+	/** The scheduler to use for scheduling new tasks as they are needed */
+	private final Scheduler scheduler;
+
+	/** The metrics group used across jobs */
+	private final JobManagerMetricGroup jobManagerMetricGroup;
+
+	/** The execution context which is used to execute futures */
+	private final Executor executionContext;
+
+	private final OnCompletionActions jobCompletionActions;
+
+	/** The execution graph of this job */
+	private volatile ExecutionGraph executionGraph;
+
+	/** The checkpoint recovery factory used by this job */
+	private CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+	private ClassLoader userCodeLoader;
+
+	private RestartStrategy restartStrategy;
+
+	private MetricGroup jobMetrics;
 
-	/**
-	 * The JM's Constructor
-	 *
-	 * @param jobGraph The representation of the job's execution plan
-	 * @param configuration The job's configuration
-	 * @param rpcService The RPC service at which the JM serves
-	 * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders.
-	 */
 	public JobMaster(
 		JobGraph jobGraph,
 		Configuration configuration,
 		RpcService rpcService,
-		HighAvailabilityServices highAvailabilityService) {
-
+		HighAvailabilityServices highAvailabilityService,
+		BlobLibraryCacheManager libraryCacheManager,
+		RestartStrategyFactory restartStrategyFactory,
+		SavepointStore savepointStore,
+		Time timeout,
+		Scheduler scheduler,
+		JobManagerMetricGroup jobManagerMetricGroup,
+		OnCompletionActions jobCompletionActions)
+	{
 		super(rpcService);
 
-		this.jobGraph = Preconditions.checkNotNull(jobGraph);
-		this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
-
-		this.configuration = Preconditions.checkNotNull(configuration);
-
-		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService);
+		this.jobGraph = checkNotNull(jobGraph);
+		this.configuration = checkNotNull(configuration);
+		this.highAvailabilityServices = checkNotNull(highAvailabilityService);
+		this.libraryCacheManager = checkNotNull(libraryCacheManager);
+		this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
+		this.savepointStore = checkNotNull(savepointStore);
+		this.timeout = checkNotNull(timeout);
+		this.scheduler = checkNotNull(scheduler);
+		this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+		this.executionContext = checkNotNull(rpcService.getExecutor());
+		this.jobCompletionActions = checkNotNull(jobCompletionActions);
 	}
 
 	public ResourceManagerGateway getResourceManager() {
@@ -93,93 +156,294 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	//----------------------------------------------------------------------------------------------
-	// Initialization methods
+	// Lifecycle management
 	//----------------------------------------------------------------------------------------------
+
+	/**
+	 * Initializing the job execution environment, should be called before start. Any error occurred during
+	 * initialization will be treated as job submission failure.
+	 *
+	 * @throws JobSubmissionException
+	 */
+	public void init() throws JobSubmissionException {
+		log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+
+		try {
+			// IMPORTANT: We need to make sure that the library registration is the first action,
+			// because this makes sure that the uploaded jar files are removed in case of
+			// unsuccessful
+			try {
+				libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(),
+					jobGraph.getClasspaths());
+			} catch (Throwable t) {
+				throw new JobSubmissionException(jobGraph.getJobID(),
+					"Cannot set up the user code libraries: " + t.getMessage(), t);
+			}
+
+			userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID());
+			if (userCodeLoader == null) {
+				throw new JobSubmissionException(jobGraph.getJobID(),
+					"The user code class loader could not be initialized.");
+			}
+
+			if (jobGraph.getNumberOfVertices() == 0) {
+				throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
+			}
+
+			final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
+				jobGraph.getSerializedExecutionConfig()
+					.deserializeValue(userCodeLoader)
+					.getRestartStrategy();
+			if (restartStrategyConfiguration != null) {
+				restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
+			} else {
+				restartStrategy = restartStrategyFactory.createRestartStrategy();
+			}
+
+			log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+
+			if (jobManagerMetricGroup != null) {
+				jobMetrics = jobManagerMetricGroup.addJob(jobGraph);
+			}
+			if (jobMetrics == null) {
+				jobMetrics = new UnregisteredMetricsGroup();
+			}
+
+			try {
+				checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
+			} catch (Exception e) {
+				log.error("Could not get the checkpoint recovery factory.", e);
+				throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e);
+			}
+
+		} catch (Throwable t) {
+			log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
+
+			libraryCacheManager.unregisterJob(jobGraph.getJobID());
+
+			if (t instanceof JobSubmissionException) {
+				throw (JobSubmissionException) t;
+			} else {
+				throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
+					jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t);
+			}
+		}
+	}
+
+	@Override
 	public void start() {
 		super.start();
-
-		// register at the election once the JM starts
-		registerAtElectionService();
 	}
 
+	@Override
+	public void shutDown() {
+		super.shutDown();
+
+		suspendJob(new Exception("JobManager is shutting down."));
+	}
 
 	//----------------------------------------------------------------------------------------------
-	// JobMaster Leadership methods
+	// RPC methods
 	//----------------------------------------------------------------------------------------------
 
 	/**
-	 * Retrieves the election service and contend for the leadership.
+	 * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint
+	 * being recovered. After this, we will begin to schedule the job.
 	 */
-	private void registerAtElectionService() {
-		try {
-			leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
-			leaderElectionService.start(new JobMasterLeaderContender());
-		} catch (Exception e) {
-			throw new RuntimeException("Fail to register at the election of JobMaster", e);
+	@RpcMethod
+	public void startJob() {
+		log.info("Starting job {} ({}).", jobGraph.getName(), jobGraph.getJobID());
+
+		if (executionGraph != null) {
+			executionGraph = new ExecutionGraph(
+				ExecutionContext$.MODULE$.fromExecutor(executionContext),
+				jobGraph.getJobID(),
+				jobGraph.getName(),
+				jobGraph.getJobConfiguration(),
+				jobGraph.getSerializedExecutionConfig(),
+				new FiniteDuration(timeout.getSize(), timeout.getUnit()),
+				restartStrategy,
+				jobGraph.getUserJarBlobKeys(),
+				jobGraph.getClasspaths(),
+				userCodeLoader,
+				jobMetrics);
+		} else {
+			// TODO: update last active time in JobInfo
 		}
-	}
 
-	/**
-	 * Start the execution when the leadership is granted.
-	 *
-	 * @param newLeaderSessionID The identifier of the new leadership session
-	 */
-	public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID);
+		try {
+			executionGraph.setScheduleMode(jobGraph.getScheduleMode());
+			executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
+
+			try {
+				executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
+			} catch (Exception e) {
+				log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e);
+				executionGraph.setJsonPlan("{}");
+			}
 
-				// The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that
-				// JM waits here for the operation's completeness.
-				leaderSessionID = newLeaderSessionID;
-				leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+			// initialize the vertices that have a master initialization hook
+			// file output formats create directories here, input formats create splits
+			if (log.isDebugEnabled()) {
+				log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
+			}
+			for (JobVertex vertex : jobGraph.getVertices()) {
+				final String executableClass = vertex.getInvokableClassName();
+				if (executableClass == null || executableClass.length() == 0) {
+					throw new JobExecutionException(jobGraph.getJobID(),
+						"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
+				}
+				if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+					vertex.setParallelism(scheduler.getTotalNumberOfSlots());
+				}
+
+				try {
+					vertex.initializeOnMaster(userCodeLoader);
+				} catch (Throwable t) {
+					throw new JobExecutionException(jobGraph.getJobID(),
+						"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
+				}
+			}
 
-				// TODO:: execute the job when the leadership is granted.
+			// topologically sort the job vertices and attach the graph to the existing one
+			final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
+			if (log.isDebugEnabled()) {
+				log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(),
+					jobGraph.getJobID(), jobGraph.getName());
 			}
-		});
-	}
+			executionGraph.attachJobGraph(sortedTopology);
 
-	/**
-	 * Stop the execution when the leadership is revoked.
-	 */
-	public void revokeJobMasterLeadership() {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.info("JobManager {} was revoked leadership.", getAddress());
+			if (log.isDebugEnabled()) {
+				log.debug("Successfully created execution graph from job graph {} ({}).",
+					jobGraph.getJobID(), jobGraph.getName());
+			}
 
-				// TODO:: cancel the job's execution and notify all listeners
-				cancelAndClearEverything(new Exception("JobManager is no longer the leader."));
+			final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
+			if (snapshotSettings != null) {
+				List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId(
+					executionGraph, snapshotSettings.getVerticesToTrigger());
+
+				List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId(
+					executionGraph, snapshotSettings.getVerticesToAcknowledge());
+
+				List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId(
+					executionGraph, snapshotSettings.getVerticesToConfirm());
+
+				CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore(
+					jobGraph.getJobID(), userCodeLoader);
+
+				CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(
+					jobGraph.getJobID());
+
+				// Checkpoint stats tracker
+				boolean isStatsDisabled = configuration.getBoolean(
+					ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+					ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
+
+				final CheckpointStatsTracker checkpointStatsTracker;
+				if (isStatsDisabled) {
+					checkpointStatsTracker = new DisabledCheckpointStatsTracker();
+				} else {
+					int historySize = configuration.getInteger(
+						ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+						ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+					checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
+				}
+
+				executionGraph.enableSnapshotCheckpointing(
+					snapshotSettings.getCheckpointInterval(),
+					snapshotSettings.getCheckpointTimeout(),
+					snapshotSettings.getMinPauseBetweenCheckpoints(),
+					snapshotSettings.getMaxConcurrentCheckpoints(),
+					triggerVertices,
+					ackVertices,
+					confirmVertices,
+					checkpointIdCounter,
+					completedCheckpoints,
+					savepointStore,
+					checkpointStatsTracker);
+			}
+
+			// TODO: register this class to execution graph as job status change listeners
+
+			// TODO: register client as job / execution status change listeners if they are interested
+
+			/*
+			TODO: decide whether we should take the savepoint before recovery
+
+			if (isRecovery) {
+				// this is a recovery of a master failure (this master takes over)
+				executionGraph.restoreLatestCheckpointedState();
+			} else {
+				if (snapshotSettings != null) {
+					String savepointPath = snapshotSettings.getSavepointPath();
+					if (savepointPath != null) {
+						// got a savepoint
+						log.info("Starting job from savepoint {}.", savepointPath);
+
+						// load the savepoint as a checkpoint into the system
+						final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
+							jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath);
+						executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
+
+						// Reset the checkpoint ID counter
+						long nextCheckpointId = savepoint.getCheckpointID() + 1;
+						log.info("Reset the checkpoint ID to " + nextCheckpointId);
+						executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
 
-				leaderSessionID = null;
+						executionGraph.restoreLatestCheckpointedState();
+					}
+				}
 			}
-		});
-	}
+			*/
 
-	/**
-	 * Handles error occurring in the leader election service
-	 *
-	 * @param exception Exception thrown in the leader election service
-	 */
-	public void onJobMasterElectionError(final Exception exception) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				log.error("Received an error from the LeaderElectionService.", exception);
+		} catch (Throwable t) {
+			log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t);
 
-				// TODO:: cancel the job's execution and shutdown the JM
-				cancelAndClearEverything(exception);
+			executionGraph.fail(t);
+			executionGraph = null;
 
-				leaderSessionID = null;
+			final Throwable rt;
+			if (t instanceof JobExecutionException) {
+				rt = (JobExecutionException) t;
+			} else {
+				rt = new JobExecutionException(jobGraph.getJobID(),
+					"Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
 			}
-		});
 
+			// TODO: notify client about this failure
+
+			jobCompletionActions.jobFailed(rt);
+			return;
+		}
+
+		// start scheduling job in another thread
+		executionContext.execute(new Runnable() {
+			@Override
+			public void run() {
+				if (executionGraph != null) {
+					try {
+						executionGraph.scheduleForExecution(scheduler);
+					} catch (Throwable t) {
+						executionGraph.fail(t);
+					}
+				}
+			}
+		});
 	}
 
-	//----------------------------------------------------------------------------------------------
-	// RPC methods
-	//----------------------------------------------------------------------------------------------
+	/**
+	 * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared.
+	 *
+	 * @param cause The reason of why this job been suspended.
+	 */
+	@RpcMethod
+	public void suspendJob(final Throwable cause) {
+		if (executionGraph != null) {
+			executionGraph.suspend(cause);
+			executionGraph = null;
+		}
+	}
 
 	/**
 	 * Updates the task execution state for a given task.
@@ -208,37 +472,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	//----------------------------------------------------------------------------------------------
 
 	/**
-	 * Cancel the current job and notify all listeners the job's cancellation.
+	 * Converts JobVertexIDs to corresponding ExecutionJobVertexes
 	 *
-	 * @param cause Cause for the cancelling.
+	 * @param executionGraph The execution graph that holds the relationship
+	 * @param vertexIDs      The vertexIDs need to be converted
+	 * @return The corresponding ExecutionJobVertexes
+	 * @throws JobExecutionException
 	 */
-	private void cancelAndClearEverything(Throwable cause) {
-		// currently, nothing to do here
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utility classes
-	// ------------------------------------------------------------------------
-	private class JobMasterLeaderContender implements LeaderContender {
-
-		@Override
-		public void grantLeadership(UUID leaderSessionID) {
-			JobMaster.this.grantJobMasterLeadership(leaderSessionID);
-		}
-
-		@Override
-		public void revokeLeadership() {
-			JobMaster.this.revokeJobMasterLeadership();
-		}
-
-		@Override
-		public String getAddress() {
-			return JobMaster.this.getAddress();
-		}
-
-		@Override
-		public void handleError(Exception exception) {
-			onJobMasterElectionError(exception);
+	private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
+		final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs)
+		throws JobExecutionException
+	{
+		final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size());
+		for (JobVertexID vertexID : vertexIDs) {
+			final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID);
+			if (executionJobVertex == null) {
+				throw new JobExecutionException(executionGraph.getJobID(),
+					"The snapshot checkpointing settings refer to non-existent vertex " + vertexID);
+			}
+			ret.add(executionJobVertex);
 		}
+		return ret;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 86bf17c..b281ea8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -29,6 +29,19 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 public interface JobMasterGateway extends RpcGateway {
 
 	/**
+	 * Making this job begins to run.
+	 */
+	void startJob();
+
+	/**
+	 * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit
+	 * the job before restarting it.
+	 *
+	 * @param cause The reason of why this job been suspended.
+	 */
+	void suspendJob(final Throwable cause);
+
+	/**
 	 * Updates the task execution state for a given task.
 	 *
 	 * @param taskExecutionState New task execution state for a given task

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..792bfd5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+	// ------------------------------------------------------------------------
+
+	/** lock to ensure that this dispatcher executes only one job at a time */
+	private final Object lock = new Object();
+
+	/** the configuration with which the mini cluster was started */
+	private final Configuration configuration;
+
+	/** the RPC service to use by the job managers */
+	private final RpcService rpcService;
+
+	/** services for discovery, leader election, and recovery */
+	private final HighAvailabilityServices haServices;
+
+	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
+	private final JobManagerServices jobManagerServices;
+
+	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
+	private final int numJobManagers;
+
+	/** The runner for the job and master. non-null if a job is currently running */
+	private volatile JobManagerRunner[] runners;
+
+	/** flag marking the dispatcher as hut down */
+	private volatile boolean shutdown;
+
+
+	/**
+	 * Starts a mini cluster job dispatcher.
+	 * 
+	 * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
+	 * non-highly-available setup.
+	 * 
+	 * @param config The configuration of the mini cluster
+	 * @param haServices Access to the discovery, leader election, and recovery services
+	 * 
+	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
+	 */
+	public MiniClusterJobDispatcher(
+			Configuration config,
+			RpcService rpcService,
+			HighAvailabilityServices haServices) throws Exception {
+		this(config, rpcService, haServices, 1);
+	}
+
+	/**
+	 * Starts a mini cluster job dispatcher.
+	 *
+	 * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
+	 * a highly-available setup.
+	 * 
+	 * @param config The configuration of the mini cluster
+	 * @param haServices Access to the discovery, leader election, and recovery services
+	 * @param numJobManagers The number of JobMasters to start for each job.
+	 * 
+	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
+	 */
+	public MiniClusterJobDispatcher(
+			Configuration config,
+			RpcService rpcService,
+			HighAvailabilityServices haServices,
+			int numJobManagers) throws Exception {
+
+		checkArgument(numJobManagers >= 1);
+		this.configuration = checkNotNull(config);
+		this.rpcService = checkNotNull(rpcService);
+		this.haServices = checkNotNull(haServices);
+		this.numJobManagers = numJobManagers;
+
+		LOG.info("Creating JobMaster services");
+		this.jobManagerServices = JobManagerServices.fromConfiguration(config);
+	}
+
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
+	 * terminally failed.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			if (!shutdown) {
+				shutdown = true;
+
+				LOG.info("Shutting down the dispatcher");
+
+				// in this shutdown code we copy the references to the stack first,
+				// to avoid concurrent modification
+
+				JobManagerRunner[] runners = this.runners;
+				if (runners != null) {
+					this.runners = null;
+
+					Exception shutdownException = new Exception("The MiniCluster is shutting down");
+					for (JobManagerRunner runner : runners) {
+						runner.shutdown(shutdownException);
+					}
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  submitting jobs
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method executes a job in detached mode. The method returns immediately after the job
+	 * has been added to the
+	 *
+	 * @param job  The Flink job to execute
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public void runDetached(JobGraph job) throws JobExecutionException {
+		checkNotNull(job);
+
+		LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+
+		synchronized (lock) {
+			checkState(!shutdown, "mini cluster is shut down");
+			checkState(runners == null, "mini cluster can only execute one job at a time");
+
+			OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers);
+
+			this.runners = startJobRunners(job, onJobCompletion);
+		}
+	}
+
+	/**
+	 * This method runs a job in blocking mode. The method returns only after the job
+	 * completed successfully, or after it failed terminally.
+	 *
+	 * @param job  The Flink job to execute 
+	 * @return The result of the job execution
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job lauch,
+	 *         or if the job terminally failed.
+	 */
+	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+		checkNotNull(job);
+		
+		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+		final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers);
+
+		synchronized (lock) {
+			checkState(!shutdown, "mini cluster is shut down");
+			checkState(runners == null, "mini cluster can only execute one job at a time");
+
+			this.runners = startJobRunners(job, onJobCompletion);
+		}
+
+		try {
+			return onJobCompletion.getResult();
+		}
+		finally {
+			// always clear the status for the next job
+			runners = null;
+		}
+	}
+
+	private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException {
+		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
+
+		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
+		for (int i = 0; i < numJobManagers; i++) {
+			try {
+				runners[i] = new JobManagerRunner(job, configuration,
+						rpcService, haServices, jobManagerServices, onCompletion);
+				runners[i].start();
+			}
+			catch (Throwable t) {
+				// shut down all the ones so far
+				Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t);
+
+				for (int k = 0; k <= i; k++) {
+					try {
+						if (runners[i] != null) {
+							runners[i].shutdown(shutdownCause);
+						}
+					} catch (Throwable ignored) {
+						// silent shutdown
+					}
+				}
+
+				throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
+			}
+		}
+
+		return runners;
+	}
+
+	// ------------------------------------------------------------------------
+	//  test methods to simulate job master failures
+	// ------------------------------------------------------------------------
+
+	public void killJobMaster(int which) {
+		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+		checkState(!shutdown, "mini cluster is shut down");
+
+		JobManagerRunner[] runners = this.runners;
+		checkState(runners != null, "mini cluster it not executing a job right now");
+
+		runners[which].shutdown(new Throwable("kill JobManager"));
+	}
+
+	// ------------------------------------------------------------------------
+	//  utility classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Simple class that waits for all runners to have reported that they are done.
+	 * In the case of a high-availability test setup, there may be multiple runners.
+	 * After that, it marks the mini cluster as ready to receive new jobs.
+	 */
+	private class DetachedFinalizer implements OnCompletionActions {
+
+		private final AtomicInteger numJobManagersToWaitFor;
+
+		private DetachedFinalizer(int numJobManagersToWaitFor) {
+			this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
+		}
+
+		@Override
+		public void jobFinished(JobExecutionResult result) {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			decrementCheckAndCleanup();
+		}
+
+		private void decrementCheckAndCleanup() {
+			if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+				MiniClusterJobDispatcher.this.runners = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This class is used to sync on blocking jobs across multiple runners.
+	 * Only after all runners reported back that they are finished, the
+	 * result will be released.
+	 * 
+	 * That way it is guaranteed that after the blocking job submit call returns,
+	 * the dispatcher is immediately free to accept another job.
+	 */
+	private static class BlockingJobSync implements OnCompletionActions {
+
+		private final JobID jobId;
+
+		private final CountDownLatch jobMastersToWaitFor;
+
+		private volatile Throwable jobException;
+
+		private volatile Throwable runnerException;
+
+		private volatile JobExecutionResult result;
+		
+		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+			this.jobId = jobId;
+			this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
+		}
+
+		@Override
+		public void jobFinished(JobExecutionResult jobResult) {
+			this.result = jobResult;
+			jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			jobException = cause;
+			jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			this.jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			if (runnerException == null) {
+				runnerException = exception;
+			}
+		}
+
+		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
+			jobMastersToWaitFor.await();
+
+			final Throwable jobFailureCause = this.jobException;
+			final Throwable runnerException = this.runnerException;
+			final JobExecutionResult result = this.result;
+
+			// (1) we check if teh job terminated with an exception
+			// (2) we check whether the job completed successfully
+			// (3) we check if we have exceptions from the JobManagers. the job may still have
+			//     completed successfully in that case, if multiple JobMasters were running
+			//     and other took over. only if all encounter a fatal error, the job cannot finish
+
+			if (jobFailureCause != null) {
+				if (jobFailureCause instanceof JobExecutionException) {
+					throw (JobExecutionException) jobFailureCause;
+				}
+				else {
+					throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
+				}
+			}
+			else if (result != null) {
+				return result;
+			}
+			else if (runnerException != null) {
+				throw new JobExecutionException(jobId,
+						"The job execution failed because all JobManagers encountered fatal errors", runnerException);
+			}
+			else {
+				throw new IllegalStateException("Bug: Job finished with neither error nor result.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
new file mode 100644
index 0000000..7721117
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+public interface FatalErrorHandler {
+
+	void onFatalError(Throwable exception);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cf709c8..9e3c3b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
@@ -340,6 +342,16 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception {
 				return null;
 			}
+
+			@Override
+			public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+				return null;
+			}
+
+			@Override
+			public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+				return null;
+			}
 		};
 
 		// start all the TaskManager services (network stack,  library cache, ...)

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 2ac43be..1a5450d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * A variant of the HighAvailabilityServices for testing. Each individual service can be set
  * to an arbitrary implementation, such as a mock or default service.
@@ -37,6 +40,9 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 
 	private volatile LeaderElectionService resourceManagerLeaderElectionService;
 
+	private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+	private volatile SubmittedJobGraphStore submittedJobGraphStore;
 
 	// ------------------------------------------------------------------------
 	//  Setters for mock / testing implementations
@@ -58,6 +64,14 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 		this.resourceManagerLeaderElectionService = leaderElectionService;
 	}
 
+	public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
+		this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+	}
+
+	public void setSubmittedJobGraphStore(SubmittedJobGraphStore submittedJobGraphStore) {
+		this.submittedJobGraphStore = submittedJobGraphStore;
+	}
+
 	// ------------------------------------------------------------------------
 	//  HA Services Methods
 	// ------------------------------------------------------------------------
@@ -103,4 +117,27 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 			throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set");
 		}
 	}
+
+	@Override
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+		CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
+
+		if (factory != null) {
+			return factory;
+		} else {
+			throw new IllegalStateException("CheckpointRecoveryFactory has not been set");
+		}
+	}
+
+	@Override
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+		SubmittedJobGraphStore store = submittedJobGraphStore;
+
+		if (store != null) {
+			return store;
+		} else {
+			throw new IllegalStateException("SubmittedJobGraphStore has not been set");
+
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
new file mode 100644
index 0000000..dc3b5fd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobManagerRunner.class)
+public class JobManagerRunnerMockTest {
+
+	private JobManagerRunner runner;
+
+	private JobMaster jobManager;
+
+	private JobMasterGateway jobManagerGateway;
+
+	private LeaderElectionService leaderElectionService;
+
+	private TestingOnCompletionActions jobCompletion;
+
+	@Before
+	public void setUp() throws Exception {
+		jobManager = mock(JobMaster.class);
+		jobManagerGateway = mock(JobMasterGateway.class);
+		when(jobManager.getSelf()).thenReturn(jobManagerGateway);
+
+		PowerMockito.whenNew(JobMaster.class).withAnyArguments().thenReturn(jobManager);
+
+		jobCompletion = new TestingOnCompletionActions();
+
+		leaderElectionService = mock(LeaderElectionService.class);
+		when(leaderElectionService.hasLeadership()).thenReturn(true);
+
+		HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
+		when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+
+		runner = PowerMockito.spy(new JobManagerRunner(
+			new JobGraph("test"),
+			mock(Configuration.class),
+			mock(RpcService.class),
+			haServices,
+			mock(JobManagerServices.class),
+			jobCompletion));
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	@Test
+	public void testStartAndShutdown() throws Exception {
+		runner.start();
+		verify(jobManager).init();
+		verify(jobManager).start();
+		verify(leaderElectionService).start(runner);
+
+		assertTrue(!jobCompletion.isJobFinished());
+		assertTrue(!jobCompletion.isJobFailed());
+
+		runner.shutdown();
+		verify(leaderElectionService).stop();
+		verify(jobManager).shutDown();
+	}
+
+	@Test
+	public void testShutdownBeforeGrantLeadership() throws Exception {
+		runner.start();
+		verify(jobManager).init();
+		verify(jobManager).start();
+		verify(leaderElectionService).start(runner);
+
+		runner.shutdown();
+		verify(leaderElectionService).stop();
+		verify(jobManager).shutDown();
+
+		assertTrue(!jobCompletion.isJobFinished());
+		assertTrue(!jobCompletion.isJobFailed());
+
+		runner.grantLeadership(UUID.randomUUID());
+		assertTrue(!jobCompletion.isJobFinished());
+		assertTrue(!jobCompletion.isJobFailed());
+
+	}
+
+	@Test
+	public void testJobFinishedByOtherBeforeGrantLeadership() throws Exception {
+		runner.start();
+
+		when(runner.isJobFinishedByOthers()).thenReturn(true);
+		runner.grantLeadership(UUID.randomUUID());
+
+		// runner should shutdown automatic and informed the job completion
+		verify(leaderElectionService).stop();
+		verify(jobManager).shutDown();
+
+		assertTrue(jobCompletion.isJobFinished());
+		assertTrue(jobCompletion.isJobFinishedByOther());
+	}
+
+	@Test
+	public void testJobFinished() throws Exception {
+		runner.start();
+
+		runner.grantLeadership(UUID.randomUUID());
+		verify(jobManagerGateway).startJob();
+		assertTrue(!jobCompletion.isJobFinished());
+
+		// runner been told by JobManager that job is finished
+		runner.jobFinished(mock(JobExecutionResult.class));
+
+		assertTrue(jobCompletion.isJobFinished());
+		assertFalse(jobCompletion.isJobFinishedByOther());
+		verify(leaderElectionService).stop();
+		verify(jobManager).shutDown();
+		assertTrue(runner.isShutdown());
+	}
+
+	@Test
+	public void testJobFailed() throws Exception {
+		runner.start();
+
+		runner.grantLeadership(UUID.randomUUID());
+		verify(jobManagerGateway).startJob();
+		assertTrue(!jobCompletion.isJobFinished());
+
+		// runner been told by JobManager that job is failed
+		runner.jobFailed(new Exception("failed manually"));
+
+		assertTrue(jobCompletion.isJobFailed());
+		verify(leaderElectionService).stop();
+		verify(jobManager).shutDown();
+		assertTrue(runner.isShutdown());
+	}
+
+	@Test
+	public void testLeadershipRevoked() throws Exception {
+		runner.start();
+
+		runner.grantLeadership(UUID.randomUUID());
+		verify(jobManagerGateway).startJob();
+		assertTrue(!jobCompletion.isJobFinished());
+
+		runner.revokeLeadership();
+		verify(jobManagerGateway).suspendJob(any(Throwable.class));
+		assertFalse(runner.isShutdown());
+	}
+
+	@Test
+	public void testRegainLeadership() throws Exception {
+		runner.start();
+
+		runner.grantLeadership(UUID.randomUUID());
+		verify(jobManagerGateway).startJob();
+		assertTrue(!jobCompletion.isJobFinished());
+
+		runner.revokeLeadership();
+		verify(jobManagerGateway).suspendJob(any(Throwable.class));
+		assertFalse(runner.isShutdown());
+
+		runner.grantLeadership(UUID.randomUUID());
+		verify(jobManagerGateway, times(2)).startJob();
+	}
+
+	private static class TestingOnCompletionActions implements OnCompletionActions {
+
+		private volatile JobExecutionResult result;
+
+		private volatile Throwable failedCause;
+
+		private volatile boolean finishedByOther;
+
+		@Override
+		public void jobFinished(JobExecutionResult result) {
+			checkArgument(!isJobFinished(), "job finished already");
+			checkArgument(!isJobFailed(), "job failed already");
+
+			this.result = result;
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			checkArgument(!isJobFinished(), "job finished already");
+			checkArgument(!isJobFailed(), "job failed already");
+
+			this.failedCause = cause;
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			checkArgument(!isJobFinished(), "job finished already");
+			checkArgument(!isJobFailed(), "job failed already");
+
+			this.finishedByOther = true;
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			jobFailed(exception);
+		}
+
+		boolean isJobFinished() {
+			return result != null || finishedByOther;
+		}
+
+		boolean isJobFinishedByOther() {
+			return finishedByOther;
+		}
+
+		boolean isJobFailed() {
+			return failedCause != null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9816484e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index 6363662..e05c8d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -19,23 +19,21 @@
 package org.apache.flink.runtime.rpc;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
-import org.junit.AfterClass;
 import org.junit.Test;
 
 import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -57,19 +55,20 @@ public class RpcConnectionTest {
 
 			// we start the RPC service with a very long timeout to ensure that the test
 			// can only pass if the connection problem is not recognized merely via a timeout
-			rpcService = new AkkaRpcService(actorSystem, new Timeout(10000000, TimeUnit.SECONDS));
+			rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS));
 
 			Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
 
-			Await.result(future, new FiniteDuration(10000000, TimeUnit.SECONDS));
+			future.get(10000000, TimeUnit.SECONDS);
 			fail("should never complete normally");
 		}
 		catch (TimeoutException e) {
 			fail("should not fail with a generic timeout exception");
 		}
-		catch (RpcConnectionException e) {
+		catch (ExecutionException e) {
 			// that is what we want
-			assertTrue("wrong error message", e.getMessage().contains("foo.bar.com.test.invalid"));
+			assertTrue(e.getCause() instanceof RpcConnectionException);
+			assertTrue("wrong error message", e.getCause().getMessage().contains("foo.bar.com.test.invalid"));
 		}
 		catch (Throwable t) {
 			fail("wrong exception: " + t);