You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:26 UTC

[07/52] [abbrv] flink git commit: [FLINK-4836] [cluster management] Add flink mini cluster (part 1)

[FLINK-4836] [cluster management] Add flink mini cluster (part 1)

This implements
  - mini cluster configuration
  - startup / shutdown of common services (rpc, ha)
  - startup / shutdown of JobManager and Dispatcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/106cb9e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/106cb9e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/106cb9e3

Branch: refs/heads/master
Commit: 106cb9e3ddb8ad019db99bc746e77c2ef48cb5e4
Parents: 9615f15
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 15 00:25:41 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:23 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  12 +-
 .../HighAvailabilityServicesUtils.java          |  17 +
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   1 -
 .../jobmaster/MiniClusterJobDispatcher.java     | 394 -----------------
 .../flink/runtime/minicluster/MiniCluster.java  | 406 ++++++++++++++++++
 .../minicluster/MiniClusterConfiguration.java   | 147 +++++++
 .../minicluster/MiniClusterJobDispatcher.java   | 418 +++++++++++++++++++
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../runtime/taskexecutor/JobLeaderService.java  |   3 +-
 .../TestingHighAvailabilityServices.java        |   2 +-
 .../runtime/minicluster/MiniClusterITCase.java  |  79 ++++
 12 files changed, 1075 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index d1357a8..1069f2d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -110,8 +110,6 @@ public final class ExceptionUtils {
 		}
 	}
 
-
-
 	/**
 	 * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to
 	 * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions
@@ -214,10 +212,8 @@ public final class ExceptionUtils {
 		}
 	}
 
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private ExceptionUtils() {
-		throw new RuntimeException();
-	}
+	// ------------------------------------------------------------------------
+
+	/** Private constructor to prevent instantiation. */
+	private ExceptionUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index f3da847..9113309 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -24,6 +24,23 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
 public class HighAvailabilityServicesUtils {
 
+	public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration config) throws Exception {
+		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
+
+		switch (highAvailabilityMode) {
+			case NONE:
+				return new EmbeddedNonHaServices();
+
+			case ZOOKEEPER:
+				throw new UnsupportedOperationException("ZooKeeper high availability services " +
+						"have not been implemented yet.");
+
+			default:
+				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
+		}
+	}
+	
+	
 	public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception {
 		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index be19c60..e38840b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -113,7 +113,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultAddress) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 74ca6f3..3313d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -289,7 +289,6 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	@Override
 	public void jobFinishedByOther() {
 		try {
-			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
deleted file mode 100644
index 019ccfe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
- * upon receiving jobs.
- */
-public class MiniClusterJobDispatcher {
-
-	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
-
-	// ------------------------------------------------------------------------
-
-	/** lock to ensure that this dispatcher executes only one job at a time */
-	private final Object lock = new Object();
-
-	/** the configuration with which the mini cluster was started */
-	private final Configuration configuration;
-
-	/** the RPC service to use by the job managers */
-	private final RpcService rpcService;
-
-	/** services for discovery, leader election, and recovery */
-	private final HighAvailabilityServices haServices;
-
-	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
-	private final JobManagerServices jobManagerServices;
-
-	/** Registry for all metrics in the mini cluster */
-	private final MetricRegistry metricRegistry;
-
-	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
-	private final int numJobManagers;
-
-	/** The runner for the job and master. non-null if a job is currently running */
-	private volatile JobManagerRunner[] runners;
-
-	/** flag marking the dispatcher as hut down */
-	private volatile boolean shutdown;
-
-
-	/**
-	 * Starts a mini cluster job dispatcher.
-	 * 
-	 * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
-	 * non-highly-available setup.
-	 * 
-	 * @param config The configuration of the mini cluster
-	 * @param haServices Access to the discovery, leader election, and recovery services
-	 * 
-	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
-	 */
-	public MiniClusterJobDispatcher(
-			Configuration config,
-			RpcService rpcService,
-			HighAvailabilityServices haServices,
-			MetricRegistry metricRegistry) throws Exception {
-		this(config, rpcService, haServices, metricRegistry, 1);
-	}
-
-	/**
-	 * Starts a mini cluster job dispatcher.
-	 *
-	 * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
-	 * a highly-available setup.
-	 * 
-	 * @param config The configuration of the mini cluster
-	 * @param haServices Access to the discovery, leader election, and recovery services
-	 * @param numJobManagers The number of JobMasters to start for each job.
-	 * 
-	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
-	 */
-	public MiniClusterJobDispatcher(
-			Configuration config,
-			RpcService rpcService,
-			HighAvailabilityServices haServices,
-			MetricRegistry metricRegistry,
-			int numJobManagers) throws Exception {
-
-		checkArgument(numJobManagers >= 1);
-		this.configuration = checkNotNull(config);
-		this.rpcService = checkNotNull(rpcService);
-		this.haServices = checkNotNull(haServices);
-		this.metricRegistry = checkNotNull(metricRegistry);
-		this.numJobManagers = numJobManagers;
-
-		LOG.info("Creating JobMaster services");
-		this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
-	}
-
-	// ------------------------------------------------------------------------
-	//  life cycle
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
-	 * terminally failed.
-	 */
-	public void shutdown() {
-		synchronized (lock) {
-			if (!shutdown) {
-				shutdown = true;
-
-				LOG.info("Shutting down the dispatcher");
-
-				// in this shutdown code we copy the references to the stack first,
-				// to avoid concurrent modification
-
-				JobManagerRunner[] runners = this.runners;
-				if (runners != null) {
-					this.runners = null;
-
-					for (JobManagerRunner runner : runners) {
-						runner.shutdown();
-					}
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  submitting jobs
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This method executes a job in detached mode. The method returns immediately after the job
-	 * has been added to the
-	 *
-	 * @param job  The Flink job to execute
-	 *
-	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
-	 *         or if the job terminally failed.
-	 */
-	public void runDetached(JobGraph job) throws JobExecutionException {
-		checkNotNull(job);
-
-		LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
-
-		synchronized (lock) {
-			checkState(!shutdown, "mini cluster is shut down");
-			checkState(runners == null, "mini cluster can only execute one job at a time");
-
-			DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
-
-			this.runners = startJobRunners(job, finalizer, finalizer);
-		}
-	}
-
-	/**
-	 * This method runs a job in blocking mode. The method returns only after the job
-	 * completed successfully, or after it failed terminally.
-	 *
-	 * @param job  The Flink job to execute 
-	 * @return The result of the job execution
-	 *
-	 * @throws JobExecutionException Thrown if anything went amiss during initial job lauch,
-	 *         or if the job terminally failed.
-	 */
-	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
-		checkNotNull(job);
-		
-		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
-		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
-
-		synchronized (lock) {
-			checkState(!shutdown, "mini cluster is shut down");
-			checkState(runners == null, "mini cluster can only execute one job at a time");
-
-			this.runners = startJobRunners(job, sync, sync);
-		}
-
-		try {
-			return sync.getResult();
-		}
-		finally {
-			// always clear the status for the next job
-			runners = null;
-		}
-	}
-
-	private JobManagerRunner[] startJobRunners(
-			JobGraph job,
-			OnCompletionActions onCompletion,
-			FatalErrorHandler errorHandler) throws JobExecutionException {
-		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
-
-		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
-		for (int i = 0; i < numJobManagers; i++) {
-			try {
-				runners[i] = new JobManagerRunner(job, configuration,
-						rpcService, haServices, jobManagerServices, metricRegistry, 
-						onCompletion, errorHandler);
-				runners[i].start();
-			}
-			catch (Throwable t) {
-				// shut down all the ones so far
-				for (int k = 0; k <= i; k++) {
-					try {
-						if (runners[i] != null) {
-							runners[i].shutdown();
-						}
-					} catch (Throwable ignored) {
-						// silent shutdown
-					}
-				}
-
-				throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
-			}
-		}
-
-		return runners;
-	}
-
-	// ------------------------------------------------------------------------
-	//  test methods to simulate job master failures
-	// ------------------------------------------------------------------------
-
-//	public void killJobMaster(int which) {
-//		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
-//		checkState(!shutdown, "mini cluster is shut down");
-//
-//		JobManagerRunner[] runners = this.runners;
-//		checkState(runners != null, "mini cluster it not executing a job right now");
-//
-//		runners[which].shutdown(new Throwable("kill JobManager"));
-//	}
-
-	// ------------------------------------------------------------------------
-	//  utility classes
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Simple class that waits for all runners to have reported that they are done.
-	 * In the case of a high-availability test setup, there may be multiple runners.
-	 * After that, it marks the mini cluster as ready to receive new jobs.
-	 */
-	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
-
-		private final AtomicInteger numJobManagersToWaitFor;
-
-		private DetachedFinalizer(int numJobManagersToWaitFor) {
-			this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
-		}
-
-		@Override
-		public void jobFinished(JobExecutionResult result) {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void jobFailed(Throwable cause) {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void jobFinishedByOther() {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void onFatalError(Throwable exception) {
-			decrementCheckAndCleanup();
-		}
-
-		private void decrementCheckAndCleanup() {
-			if (numJobManagersToWaitFor.decrementAndGet() == 0) {
-				MiniClusterJobDispatcher.this.runners = null;
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This class is used to sync on blocking jobs across multiple runners.
-	 * Only after all runners reported back that they are finished, the
-	 * result will be released.
-	 * 
-	 * That way it is guaranteed that after the blocking job submit call returns,
-	 * the dispatcher is immediately free to accept another job.
-	 */
-	private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
-
-		private final JobID jobId;
-
-		private final CountDownLatch jobMastersToWaitFor;
-
-		private volatile Throwable jobException;
-
-		private volatile Throwable runnerException;
-
-		private volatile JobExecutionResult result;
-		
-		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
-			this.jobId = jobId;
-			this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
-		}
-
-		@Override
-		public void jobFinished(JobExecutionResult jobResult) {
-			this.result = jobResult;
-			jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void jobFailed(Throwable cause) {
-			jobException = cause;
-			jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void jobFinishedByOther() {
-			this.jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void onFatalError(Throwable exception) {
-			if (runnerException == null) {
-				runnerException = exception;
-			}
-		}
-
-		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
-			jobMastersToWaitFor.await();
-
-			final Throwable jobFailureCause = this.jobException;
-			final Throwable runnerException = this.runnerException;
-			final JobExecutionResult result = this.result;
-
-			// (1) we check if the job terminated with an exception
-			// (2) we check whether the job completed successfully
-			// (3) we check if we have exceptions from the JobManagers. the job may still have
-			//     completed successfully in that case, if multiple JobMasters were running
-			//     and other took over. only if all encounter a fatal error, the job cannot finish
-
-			if (jobFailureCause != null) {
-				if (jobFailureCause instanceof JobExecutionException) {
-					throw (JobExecutionException) jobFailureCause;
-				}
-				else {
-					throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
-				}
-			}
-			else if (result != null) {
-				return result;
-			}
-			else if (runnerException != null) {
-				throw new JobExecutionException(jobId,
-						"The job execution failed because all JobManagers encountered fatal errors", runnerException);
-			}
-			else {
-				throw new IllegalStateException("Bug: Job finished with neither error nor result.");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
new file mode 100644
index 0000000..1ee38e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import akka.actor.ActorSystem;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.ExceptionUtils;
+
+import scala.Option;
+import scala.Tuple2;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+
+public class MiniCluster {
+
+	/** The lock to guard startup / shutdown / manipulation methods */
+	private final Object lock = new Object();
+
+	/** The configuration for this mini cluster */
+	private final MiniClusterConfiguration config;
+
+	@GuardedBy("lock")
+	private MetricRegistry metricRegistry;
+
+	@GuardedBy("lock")
+	private RpcService commonRpcService;
+
+	@GuardedBy("lock")
+	private RpcService[] jobManagerRpcServices;
+
+	@GuardedBy("lock")
+	private RpcService[] taskManagerRpcServices;
+
+	@GuardedBy("lock")
+	private HighAvailabilityServices haServices;
+
+	@GuardedBy("lock")
+	private MiniClusterJobDispatcher jobDispatcher;
+
+	/** Flag marking the mini cluster as started/running */
+	@GuardedBy("lock")
+	private boolean running;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new mini cluster with the default configuration:
+	 * <ul>
+	 *     <li>One JobManager</li>
+	 *     <li>One TaskManager</li>
+	 *     <li>One task slot in the TaskManager</li>
+	 *     <li>All components share the same RPC subsystem (minimizes communication overhead)</li>
+	 * </ul>
+	 */
+	public MiniCluster() {
+		this(new MiniClusterConfiguration());
+	}
+
+	/**
+	 * 
+	 * @param config The configuration for the mini cluster
+	 */
+	public MiniCluster(MiniClusterConfiguration config) {
+		this.config = checkNotNull(config, "config may not be null");
+	}
+
+	/**
+	 * Creates a mini cluster based on the given configuration.
+	 * 
+	 * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. 
+	 * @see #MiniCluster(MiniClusterConfiguration)
+	 */
+	@Deprecated
+	public MiniCluster(Configuration config) {
+		this(createConfig(config, true));
+	}
+
+	/**
+	 * Creates a mini cluster based on the given configuration, starting one or more
+	 * RPC services, depending on the given flag.
+	 *
+	 * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} instead. 
+	 * @see #MiniCluster(MiniClusterConfiguration)
+	 */
+	@Deprecated
+	public MiniCluster(Configuration config, boolean singleRpcService) {
+		this(createConfig(config, singleRpcService));
+	}
+
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks if the mini cluster was started and is running.
+	 */
+	public boolean isRunning() {
+		return running;
+	}
+
+	/**
+	 * Starts the mini cluster, based on the configured properties.
+	 * 
+	 * @throws Exception This method passes on any exception that occurs during the startup of
+	 *                   the mini cluster.
+	 */
+	public void start() throws Exception {
+		synchronized (lock) {
+			checkState(!running, "FlinkMiniCluster is already running");
+
+			final Configuration configuration = new UnmodifiableConfiguration(config.getConfiguration());
+			final Time rpcTimeout = config.getRpcTimeout();
+			final int numJobManagers = config.getNumJobManagers();
+			final int numTaskManagers = config.getNumTaskManagers();
+			final boolean singleRpc = config.getUseSingleRpcSystem();
+
+			try {
+				metricRegistry = createMetricRegistry(configuration);
+
+				RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
+				RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];
+
+				// bring up all the RPC services
+				if (singleRpc) {
+					// one common RPC for all
+					commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+
+					// set that same RPC service for all JobManagers and TaskManagers
+					for (int i = 0; i < numJobManagers; i++) {
+						jobManagerRpcServices[i] = commonRpcService;
+					}
+					for (int i = 0; i < numTaskManagers; i++) {
+						taskManagerRpcServices[i] = commonRpcService;
+					}
+				}
+				else {
+					// start a new service per component, possibly with custom bind addresses
+					final String jobManagerBindAddress = config.getJobManagerBindAddress();
+					final String taskManagerBindAddress = config.getTaskManagerBindAddress();
+
+					for (int i = 0; i < numJobManagers; i++) {
+						jobManagerRpcServices[i] = createRpcService(
+								configuration, rpcTimeout, true, jobManagerBindAddress);
+					}
+
+					for (int i = 0; i < numTaskManagers; i++) {
+						taskManagerRpcServices[i] = createRpcService(
+								configuration, rpcTimeout, true, taskManagerBindAddress);
+					}
+
+					this.jobManagerRpcServices = jobManagerRpcServices;
+					this.taskManagerRpcServices = taskManagerRpcServices;
+				}
+
+				// create the high-availability services
+				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration);
+
+				// bring up the dispatcher that launches JobManagers when jobs submitted
+				jobDispatcher = new MiniClusterJobDispatcher(
+						configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
+			}
+			catch (Exception e) {
+				// cleanup everything
+				try {
+					shutdownInternally();
+				} catch (Exception ee) {
+					e.addSuppressed(ee);
+				}
+				throw e;
+			}
+
+			// now officially mark this as running
+			running = true;
+		}
+	}
+
+	/**
+	 * Shuts down the mini cluster, failing all currently executing jobs.
+	 * The mini cluster can be started again by calling the {@link #start()} method again.
+	 * 
+	 * <p>This method shuts down all started services and components,
+	 * even if an exception occurs in the process of shutting down some component. 
+	 * 
+	 * @throws Exception Thrown, if the shutdown did not complete cleanly.
+	 */
+	public void shutdown() throws Exception {
+		synchronized (lock) {
+			if (running) {
+				try {
+					shutdownInternally();
+				} finally {
+					running = false;
+				}
+			}
+		}
+	}
+
+	@GuardedBy("lock")
+	private void shutdownInternally() throws Exception {
+		// this should always be called under the lock
+		assert Thread.holdsLock(lock);
+
+		// collect the first exception, but continue and add all successive
+		// exceptions as suppressed
+		Throwable exception = null;
+
+		// cancel all jobs and shut down the job dispatcher
+		if (jobDispatcher != null) {
+			try {
+				jobDispatcher.shutdown();
+			} catch (Exception e) {
+				exception = firstOrSuppressed(e, exception);
+			}
+			jobDispatcher = null;
+		}
+
+		// shut down high-availability services
+		if (haServices != null) {
+			try {
+				haServices.shutdown();
+			} catch (Exception e) {
+				exception = firstOrSuppressed(e, exception);
+			}
+			haServices = null;
+		}
+
+		// shut down the RpcServices
+		if (commonRpcService != null) {
+			exception = shutDownRpc(commonRpcService, exception);
+			commonRpcService = null;
+		}
+		if (jobManagerRpcServices != null) {
+			for (RpcService service : jobManagerRpcServices) {
+				exception = shutDownRpc(service, exception);
+			}
+			jobManagerRpcServices = null;
+		}
+		if (taskManagerRpcServices != null) {
+			for (RpcService service : taskManagerRpcServices) {
+				exception = shutDownRpc(service, exception);
+			}
+			taskManagerRpcServices = null;
+		}
+
+		// metrics shutdown
+		if (metricRegistry != null) {
+			metricRegistry.shutdown();
+			metricRegistry = null;
+		}
+
+		// if anything went wrong, throw the first error with all the additional suppressed exceptions
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Error while shutting down mini cluster");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  running jobs
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method executes a job in detached mode. The method returns immediately after the job
+	 * has been added to the
+	 *
+	 * @param job  The Flink job to execute
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public void runDetached(JobGraph job) throws JobExecutionException {
+		checkNotNull(job, "job is null");
+
+		synchronized (lock) {
+			checkState(running, "mini cluster is not running");
+			jobDispatcher.runDetached(job);
+		}
+	}
+
+	/**
+	 * This method runs a job in blocking mode. The method returns only after the job
+	 * completed successfully, or after it failed terminally.
+	 *
+	 * @param job  The Flink job to execute 
+	 * @return The result of the job execution
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+		checkNotNull(job, "job is null");
+
+		MiniClusterJobDispatcher dispatcher;
+		synchronized (lock) {
+			checkState(running, "mini cluster is not running");
+			dispatcher = this.jobDispatcher;
+		}
+
+		return dispatcher.runJobBlocking(job);
+	}
+
+	// ------------------------------------------------------------------------
+	//  factories - can be overridden by subclasses to alter behavior
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory method to create the metric registry for the mini cluster
+	 * 
+	 * @param config The configuration of the mini cluster
+	 */
+	protected MetricRegistry createMetricRegistry(Configuration config) {
+		return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+	}
+
+	/**
+	 * Factory method to instantiate the RPC service.
+	 * 
+	 * @param config
+	 *            The configuration of the mini cluster
+	 * @param askTimeout
+	 *            The default RPC timeout for asynchronous "ask" requests.
+	 * @param remoteEnabled
+	 *            True, if the RPC service should be reachable from other (remote) RPC services.
+	 * @param bindAddress
+	 *            The address to bind the RPC service to. Only relevant when "remoteEnabled" is true.
+	 * 
+	 * @return The instantiated RPC service
+	 */
+	protected RpcService createRpcService(
+			Configuration config,
+			Time askTimeout,
+			boolean remoteEnabled,
+			String bindAddress) {
+
+		ActorSystem actorSystem;
+		if (remoteEnabled) {
+			Tuple2<String, Object> remoteSettings = new Tuple2<String, Object>(bindAddress, 0);
+			actorSystem = AkkaUtils.createActorSystem(config, Option.apply(remoteSettings));
+		} else {
+			actorSystem = AkkaUtils.createLocalActorSystem(config);
+		}
+
+		return new AkkaRpcService(actorSystem, askTimeout);
+	}
+
+	// ------------------------------------------------------------------------
+	//  miscellaneous utilities
+	// ------------------------------------------------------------------------
+
+	private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) {
+		try {
+			if (rpcService != null) {
+				rpcService.stopService();
+			}
+			return priorException;
+		}
+		catch (Throwable t) {
+			return firstOrSuppressed(t, priorException);
+		}
+	}
+
+	private static MiniClusterConfiguration createConfig(Configuration cfg, boolean singleActorSystem) {
+		MiniClusterConfiguration config = cfg == null ?
+				new MiniClusterConfiguration() :
+				new MiniClusterConfiguration(cfg);
+
+		if (!singleActorSystem) {
+			config.setUseRpcServicePerComponent();
+		}
+
+		return config;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
new file mode 100644
index 0000000..a8d7b10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class MiniClusterConfiguration {
+
+	private final Configuration config;
+
+	private boolean singleRpcService = true;
+
+	private int numJobManagers = 1;
+
+	private int numTaskManagers = 1;
+
+	private String commonBindAddress;
+
+	// ------------------------------------------------------------------------
+	//  Construction
+	// ------------------------------------------------------------------------
+
+	public MiniClusterConfiguration() {
+		this.config = new Configuration();
+	}
+
+	public MiniClusterConfiguration(Configuration config) {
+		checkNotNull(config);
+		this.config = new Configuration(config);
+	}
+
+	// ------------------------------------------------------------------------
+	//  setters
+	// ------------------------------------------------------------------------
+
+	public void addConfiguration(Configuration config) {
+		checkNotNull(config, "configuration must not be null");
+		this.config.addAll(config);
+	}
+
+	public void setUseSingleRpcService() {
+		this.singleRpcService = true;
+	}
+
+	public void setUseRpcServicePerComponent() {
+		this.singleRpcService = false;
+	}
+
+	public void setNumJobManagers(int numJobManagers) {
+		checkArgument(numJobManagers >= 1, "must have at least one JobManager");
+		this.numJobManagers = numJobManagers;
+	}
+
+	public void setNumTaskManagers(int numTaskManagers) {
+		checkArgument(numTaskManagers >= 1, "must have at least one TaskManager");
+		this.numTaskManagers = numTaskManagers;
+	}
+
+	public void setNumTaskManagerSlots(int numTaskSlots) {
+		checkArgument(numTaskSlots >= 1, "must have at least one task slot per TaskManager");
+		this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numTaskSlots);
+	}
+
+	public void setCommonRpcBindAddress(String bindAddress) {
+		checkNotNull(bindAddress, "bind address must not be null");
+		this.commonBindAddress = bindAddress;
+	}
+
+	// ------------------------------------------------------------------------
+	//  getters
+	// ------------------------------------------------------------------------
+
+	public Configuration getConfiguration() {
+		return config;
+	}
+
+	public boolean getUseSingleRpcSystem() {
+		return singleRpcService;
+	}
+
+	public int getNumJobManagers() {
+		return numJobManagers;
+	}
+
+	public int getNumTaskManagers() {
+		return numTaskManagers;
+	}
+
+	public int getNumSlotsPerTaskManager() {
+		return config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+	}
+
+	public String getJobManagerBindAddress() {
+		return commonBindAddress != null ?
+				commonBindAddress :
+				config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+	}
+
+	public String getTaskManagerBindAddress() {
+		return commonBindAddress != null ?
+				commonBindAddress :
+				config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+	}
+
+	public Time getRpcTimeout() {
+		FiniteDuration duration = AkkaUtils.getTimeout(config);
+		return Time.of(duration.length(), duration.unit());
+	}
+
+	// ------------------------------------------------------------------------
+	//  utils
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "MiniClusterConfiguration{" +
+				"singleRpcService=" + singleRpcService +
+				", numJobManagers=" + numJobManagers +
+				", numTaskManagers=" + numTaskManagers +
+				", commonBindAddress='" + commonBindAddress + '\'' +
+				", config=" + config +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..d99eff6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+	// ------------------------------------------------------------------------
+
+	/** lock to ensure that this dispatcher executes only one job at a time */
+	private final Object lock = new Object();
+
+	/** the configuration with which the mini cluster was started */
+	private final Configuration configuration;
+
+	/** the RPC services to use by the job managers */
+	private final RpcService[] rpcServices;
+
+	/** services for discovery, leader election, and recovery */
+	private final HighAvailabilityServices haServices;
+
+	/** al the services that the JobManager needs, such as BLOB service, factories, etc */
+	private final JobManagerServices jobManagerServices;
+
+	/** Registry for all metrics in the mini cluster */
+	private final MetricRegistry metricRegistry;
+
+	/** The number of JobManagers to launch (more than one simulates a high-availability setup) */
+	private final int numJobManagers;
+
+	/** The runner for the job and master. non-null if a job is currently running */
+	private volatile JobManagerRunner[] runners;
+
+	/** flag marking the dispatcher as hut down */
+	private volatile boolean shutdown;
+
+
+	/**
+	 * Starts a mini cluster job dispatcher.
+	 * 
+	 * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a
+	 * non-highly-available setup.
+	 * 
+	 * @param config The configuration of the mini cluster
+	 * @param haServices Access to the discovery, leader election, and recovery services
+	 * 
+	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
+	 */
+	public MiniClusterJobDispatcher(
+			Configuration config,
+			RpcService rpcService,
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry) throws Exception {
+		this(config, haServices, metricRegistry, 1, new RpcService[] { rpcService });
+	}
+
+	/**
+	 * Starts a mini cluster job dispatcher.
+	 *
+	 * <p>The dispatcher may kick off more than one JobManager per job, thus simulating
+	 * a highly-available setup.
+	 * 
+	 * @param config The configuration of the mini cluster
+	 * @param haServices Access to the discovery, leader election, and recovery services
+	 * @param numJobManagers The number of JobMasters to start for each job.
+	 * 
+	 * @throws Exception Thrown, if the services for the JobMaster could not be started.
+	 */
+	public MiniClusterJobDispatcher(
+			Configuration config,
+			HighAvailabilityServices haServices,
+			MetricRegistry metricRegistry,
+			int numJobManagers,
+			RpcService[] rpcServices) throws Exception {
+		
+		checkArgument(numJobManagers >= 1);
+		checkArgument(rpcServices.length == numJobManagers);
+		
+		this.configuration = checkNotNull(config);
+		this.rpcServices = rpcServices;
+		this.haServices = checkNotNull(haServices);
+		this.metricRegistry = checkNotNull(metricRegistry);
+		this.numJobManagers = numJobManagers;
+
+		LOG.info("Creating JobMaster services");
+		this.jobManagerServices = JobManagerServices.fromConfiguration(config, haServices);
+	}
+
+	// ------------------------------------------------------------------------
+	//  life cycle
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be
+	 * terminally failed.
+	 */
+	public void shutdown() {
+		synchronized (lock) {
+			if (!shutdown) {
+				shutdown = true;
+
+				LOG.info("Shutting down the dispatcher");
+
+				// in this shutdown code we copy the references to the stack first,
+				// to avoid concurrent modification
+
+				JobManagerRunner[] runners = this.runners;
+				if (runners != null) {
+					this.runners = null;
+
+					for (JobManagerRunner runner : runners) {
+						runner.shutdown();
+					}
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  submitting jobs
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method executes a job in detached mode. The method returns immediately after the job
+	 * has been added to the
+	 *
+	 * @param job  The Flink job to execute
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public void runDetached(JobGraph job) throws JobExecutionException {
+		checkNotNull(job);
+
+		LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID());
+
+		synchronized (lock) {
+			checkState(!shutdown, "mini cluster is shut down");
+			checkState(runners == null, "mini cluster can only execute one job at a time");
+
+			DetachedFinalizer finalizer = new DetachedFinalizer(numJobManagers);
+
+			this.runners = startJobRunners(job, finalizer, finalizer);
+		}
+	}
+
+	/**
+	 * This method runs a job in blocking mode. The method returns only after the job
+	 * completed successfully, or after it failed terminally.
+	 *
+	 * @param job  The Flink job to execute 
+	 * @return The result of the job execution
+	 *
+	 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
+	 *         or if the job terminally failed.
+	 */
+	public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+		checkNotNull(job);
+		
+		LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID());
+		final BlockingJobSync sync = new BlockingJobSync(job.getJobID(), numJobManagers);
+
+		synchronized (lock) {
+			checkState(!shutdown, "mini cluster is shut down");
+			checkState(runners == null, "mini cluster can only execute one job at a time");
+
+			this.runners = startJobRunners(job, sync, sync);
+		}
+
+		try {
+			return sync.getResult();
+		}
+		finally {
+			// always clear the status for the next job
+			runners = null;
+		}
+	}
+
+	private JobManagerRunner[] startJobRunners(
+			JobGraph job,
+			OnCompletionActions onCompletion,
+			FatalErrorHandler errorHandler) throws JobExecutionException {
+
+		LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID());
+
+		// we first need to mark the job as running in the HA services, so that the
+		// JobManager leader will recognize that it as work to do
+		try {
+			haServices.getRunningJobsRegistry().setJobRunning(job.getJobID());
+		}
+		catch (Throwable t) {
+			throw new JobExecutionException(job.getJobID(),
+					"Could not register the job at the high-availability services", t);
+		}
+
+		// start all JobManagers
+		JobManagerRunner[] runners = new JobManagerRunner[numJobManagers];
+		for (int i = 0; i < numJobManagers; i++) {
+			try {
+				runners[i] = new JobManagerRunner(job, configuration,
+						rpcServices[i], haServices, jobManagerServices, metricRegistry, 
+						onCompletion, errorHandler);
+				runners[i].start();
+			}
+			catch (Throwable t) {
+				// shut down all the ones so far
+				for (int k = 0; k <= i; k++) {
+					try {
+						if (runners[i] != null) {
+							runners[i].shutdown();
+						}
+					} catch (Throwable ignored) {
+						// silent shutdown
+					}
+				}
+
+				// un-register the job from the high.availability services
+				try {
+					haServices.getRunningJobsRegistry().setJobFinished(job.getJobID());
+				}
+				catch (Throwable tt) {
+					LOG.warn("Could not properly unregister job from high-availability services", tt);
+				}
+
+				throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t);
+			}
+		}
+
+		return runners;
+	}
+
+	// ------------------------------------------------------------------------
+	//  test methods to simulate job master failures
+	// ------------------------------------------------------------------------
+
+//	public void killJobMaster(int which) {
+//		checkArgument(which >= 0 && which < numJobManagers, "no such job master");
+//		checkState(!shutdown, "mini cluster is shut down");
+//
+//		JobManagerRunner[] runners = this.runners;
+//		checkState(runners != null, "mini cluster it not executing a job right now");
+//
+//		runners[which].shutdown(new Throwable("kill JobManager"));
+//	}
+
+	// ------------------------------------------------------------------------
+	//  utility classes
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Simple class that waits for all runners to have reported that they are done.
+	 * In the case of a high-availability test setup, there may be multiple runners.
+	 * After that, it marks the mini cluster as ready to receive new jobs.
+	 */
+	private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
+
+		private final AtomicInteger numJobManagersToWaitFor;
+
+		private DetachedFinalizer(int numJobManagersToWaitFor) {
+			this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor);
+		}
+
+		@Override
+		public void jobFinished(JobExecutionResult result) {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			decrementCheckAndCleanup();
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			decrementCheckAndCleanup();
+		}
+
+		private void decrementCheckAndCleanup() {
+			if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+				MiniClusterJobDispatcher.this.runners = null;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This class is used to sync on blocking jobs across multiple runners.
+	 * Only after all runners reported back that they are finished, the
+	 * result will be released.
+	 * 
+	 * That way it is guaranteed that after the blocking job submit call returns,
+	 * the dispatcher is immediately free to accept another job.
+	 */
+	private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
+
+		private final JobID jobId;
+
+		private final CountDownLatch jobMastersToWaitFor;
+
+		private volatile Throwable jobException;
+
+		private volatile Throwable runnerException;
+
+		private volatile JobExecutionResult result;
+		
+		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+			this.jobId = jobId;
+			this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor);
+		}
+
+		@Override
+		public void jobFinished(JobExecutionResult jobResult) {
+			this.result = jobResult;
+			jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void jobFailed(Throwable cause) {
+			jobException = cause;
+			jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void jobFinishedByOther() {
+			this.jobMastersToWaitFor.countDown();
+		}
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			if (runnerException == null) {
+				runnerException = exception;
+			}
+		}
+
+		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
+			jobMastersToWaitFor.await();
+
+			final Throwable jobFailureCause = this.jobException;
+			final Throwable runnerException = this.runnerException;
+			final JobExecutionResult result = this.result;
+
+			// (1) we check if the job terminated with an exception
+			// (2) we check whether the job completed successfully
+			// (3) we check if we have exceptions from the JobManagers. the job may still have
+			//     completed successfully in that case, if multiple JobMasters were running
+			//     and other took over. only if all encounter a fatal error, the job cannot finish
+
+			if (jobFailureCause != null) {
+				if (jobFailureCause instanceof JobExecutionException) {
+					throw (JobExecutionException) jobFailureCause;
+				}
+				else {
+					throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause);
+				}
+			}
+			else if (result != null) {
+				return result;
+			}
+			else if (runnerException != null) {
+				throw new JobExecutionException(jobId,
+						"The job execution failed because all JobManagers encountered fatal errors", runnerException);
+			}
+			else {
+				throw new IllegalStateException("Bug: Job finished with neither error nor result.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6f6d525..3122804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		} else {
 			try {
 				LeaderRetrievalService jobMasterLeaderRetriever =
-					highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress);
+					highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
 				jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
 			} catch (Exception e) {
 				log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 9e71349..e7f52e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -191,8 +191,7 @@ public class JobLeaderService {
 		LOG.info("Add job {} for job leader monitoring.", jobId);
 
 		final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
-			jobId,
-			defaultTargetAddress);
+			jobId);
 
 		JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3e88e8c..877812b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
 		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
 		if (service != null) {
 			return service;

http://git-wip-us.apache.org/repos/asf/flink/blob/106cb9e3/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
new file mode 100644
index 0000000..dd43337
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+/**
+ * Integration test cases for the {@link MiniCluster}.
+ */
+public class MiniClusterITCase extends TestLogger {
+
+//	@Test
+	public void runJobWithSingleRpcService() throws Exception {
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+
+		// should be the default, but set anyways to make sure the test
+		// stays valid when the default changes
+		cfg.setUseSingleRpcService();
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		executeJob(miniCluster);
+	}
+
+//	@Test
+	public void runJobWithMultipleRpcServices() throws Exception {
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+		cfg.setUseRpcServicePerComponent();
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		executeJob(miniCluster);
+	}
+
+//	@Test
+	public void runJobWithMultipleJobManagers() throws Exception {
+		MiniClusterConfiguration cfg = new MiniClusterConfiguration();
+		cfg.setNumJobManagers(3);
+
+		MiniCluster miniCluster = new MiniCluster(cfg);
+		executeJob(miniCluster);
+	}
+
+	private static void executeJob(MiniCluster miniCluster) throws Exception {
+		miniCluster.start();
+
+		JobGraph job = getSimpleJob();
+		miniCluster.runJobBlocking(job);
+	}
+
+	private static JobGraph getSimpleJob() {
+		JobVertex task = new JobVertex("Test task");
+		task.setParallelism(1);
+		task.setMaxParallelism(1);
+		task.setInvokableClass(NoOpInvokable.class);
+
+		return new JobGraph(new JobID(), "Test Job", task);
+	}
+}