You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/31 12:31:43 UTC

[07/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala
new file mode 100644
index 0000000..79f63de
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/RemoteAddressExtension.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka
+
+import akka.actor.{Address, ExtensionKey, Extension, ExtendedActorSystem}
+
+/** [[akka.actor.ActorSystem]] [[Extension]] used to obtain the [[Address]] on which the
+  * given ActorSystem is listening.
+  *
+  * @param system
+  */
+class RemoteAddressExtensionImplementation(system: ExtendedActorSystem) extends Extension {
+  def address: Address = system.provider.getDefaultAddress
+}
+
+object RemoteAddressExtension extends ExtensionKey[RemoteAddressExtensionImplementation]{}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e9a31fb..26bf91b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -24,7 +24,8 @@ import java.net.InetSocketAddress
 import java.util.{UUID, Collections}
 
 import akka.actor.Status.Failure
-import akka.actor._
+import akka.actor.{Props, Terminated, PoisonPill, ActorRef, ActorSystem}
+import akka.pattern.ask
 
 import grizzled.slf4j.Logger
 
@@ -36,24 +37,30 @@ import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
+import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-import org.apache.flink.runtime.messages.accumulators._
-import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
+
+import org.apache.flink.runtime.messages.accumulators.{AccumulatorResultsErroneous,
+AccumulatorResultsFound, RequestAccumulatorResults, AccumulatorMessage,
+AccumulatorResultStringsFound, RequestAccumulatorResultsStringified}
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
+AcknowledgeCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.{SerializedThrowable, ZooKeeperUtil, EnvironmentInformation}
+import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.WebMonitor
-import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessages}
+import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessageFilter}
 import org.apache.flink.runtime.LogMessages
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{ListeningBehaviour, AkkaUtils}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway, InstanceManager}
+import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -61,7 +68,6 @@ import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
 import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil}
 
-import _root_.akka.pattern.ask
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
@@ -105,38 +111,58 @@ class JobManager(
     protected val defaultExecutionRetries: Int,
     protected val delayBetweenRetries: Long,
     protected val timeout: FiniteDuration,
-    protected val mode: StreamingMode)
-  extends FlinkActor
-  with LeaderSessionMessages // order of the mixin is important, we want filtering after logging
-  with LogMessages // order of the mixin is important, we want first logging
-  {
+    protected val mode: StreamingMode,
+    protected val leaderElectionService: LeaderElectionService)
+  extends FlinkActor 
+  with LeaderSessionMessageFilter // mixin oder is important, we want filtering after logging
+  with LogMessages // mixin order is important, we want first logging
+  with LeaderContender {
 
   override val log = Logger(getClass)
 
   /** List of current jobs running jobs */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
-  override val leaderSessionID = Some(UUID.randomUUID())
+  var leaderSessionID: Option[UUID] = None
 
   /**
    * Run when the job manager is started. Simply logs an informational message.
+   * The method also starts the leader election service.
    */
   override def preStart(): Unit = {
-    log.info(s"Starting JobManager at ${self.path.toSerializationFormat}.")
+    log.info(s"Starting JobManager at ${getAddress}.")
+
+    try {
+      leaderElectionService.start(this)
+    } catch {
+      case e: Exception =>
+        log.error("Could not start the JobManager because the leader election service did not " +
+          "start.", e)
+        throw new RuntimeException("Could not start the leader election service.", e)
+    }
   }
 
   override def postStop(): Unit = {
-    log.info(s"Stopping JobManager ${self.path.toSerializationFormat}.")
+    log.info(s"Stopping JobManager ${getAddress}.")
+
+    cancelAndClearEverything(new Exception("The JobManager is shutting down."))
 
     // disconnect the registered task managers
     instanceManager.getAllRegisteredInstances.asScala.foreach {
-      _.getActorGateway().tell(Disconnect("JobManager is shutting down"))
+      _.getActorGateway().tell(
+        Disconnect("JobManager is shutting down"),
+        new AkkaActorGateway(self, leaderSessionID.orNull))
     }
 
-    archive ! decorateMessage(PoisonPill)
+    try {
+      // revoke leadership and stop leader election service
+      leaderElectionService.stop()
+    } catch {
+      case e: Exception => log.error("Could not properly shutdown the leader election service.")
+    }
 
-    for((e,_) <- currentJobs.values) {
-      e.fail(new Exception("The JobManager is shutting down."))
+    if (archive != ActorRef.noSender) {
+      archive ! decorateMessage(PoisonPill)
     }
 
     instanceManager.shutdown()
@@ -158,13 +184,40 @@ class JobManager(
    */
   override def handleMessage: Receive = {
 
+    case GrantLeadership(newLeaderSessionID) =>
+      log.info(s"JobManager ${getAddress} was granted leadership with leader session ID " +
+        s"${newLeaderSessionID}.")
+
+      leaderSessionID = newLeaderSessionID
+
+      // confirming the leader session ID might be blocking, thus do it in a future
+      future{
+        leaderElectionService.confirmLeaderSessionID(newLeaderSessionID.orNull)
+      }(context.dispatcher)
+
+    case RevokeLeadership =>
+      log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.")
+
+      cancelAndClearEverything(new Exception("JobManager is no longer the leader."))
+
+      // disconnect the registered task managers
+      instanceManager.getAllRegisteredInstances.asScala.foreach {
+        _.getActorGateway().tell(
+          Disconnect("JobManager is no longer the leader"),
+          new AkkaActorGateway(self, leaderSessionID.orNull))
+      }
+
+      instanceManager.unregisterAllTaskManagers()
+
+      leaderSessionID = None
+
     case RegisterTaskManager(
-      registrationSessionID,
-      taskManager,
       connectionInfo,
       hardwareInformation,
       numberOfSlots) =>
 
+      val taskManager = sender()
+
       if (instanceManager.isRegistered(taskManager)) {
         val instanceID = instanceManager.getRegisteredInstance(taskManager).getId
 
@@ -172,9 +225,6 @@ class JobManager(
         //            TaskManager actor, but the ask future!
         sender() ! decorateMessage(
           AlreadyRegistered(
-            registrationSessionID,
-            leaderSessionID.get,
-            self,
             instanceID,
             libraryCacheManager.getBlobServerPort)
         )
@@ -186,15 +236,12 @@ class JobManager(
             connectionInfo,
             hardwareInformation,
             numberOfSlots,
-            leaderSessionID)
+            leaderSessionID.orNull)
 
           // IMPORTANT: Send the response to the "sender", which is not the
           //            TaskManager actor, but the ask future!
           sender() ! decorateMessage(
             AcknowledgeRegistration(
-              registrationSessionID,
-              leaderSessionID.get,
-              self,
               instanceID,
               libraryCacheManager.getBlobServerPort)
           )
@@ -212,7 +259,6 @@ class JobManager(
             //            TaskManager actor, but the ask future!
             sender() ! decorateMessage(
               RefuseRegistration(
-                registrationSessionID,
                 ExceptionUtils.stringifyException(e))
             )
         }
@@ -224,8 +270,8 @@ class JobManager(
     case RequestTotalNumberOfSlots =>
       sender ! decorateMessage(instanceManager.getTotalNumberOfSlots)
 
-    case SubmitJob(jobGraph, listen) =>
-      submitJob(jobGraph, listenToEvents = listen)
+    case SubmitJob(jobGraph, listeningBehaviour) =>
+      submitJob(jobGraph, listeningBehaviour)
 
     case CancelJob(jobID) =>
       log.info(s"Trying to cancel job with ID $jobID.")
@@ -334,22 +380,23 @@ class JobManager(
             jobInfo.end = timeStamp
 
             // is the client waiting for the job result?
-            newJobStatus match {
-              case JobStatus.FINISHED =>
-                val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
+            if(jobInfo.client != ActorRef.noSender) {
+              newJobStatus match {
+                case JobStatus.FINISHED =>
+                  val accumulatorResults: java.util.Map[String, SerializedValue[AnyRef]] = try {
                   executionGraph.getAccumulatorsSerialized()
-                } catch {
-                  case e: Exception =>
-                    log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
-                    Collections.emptyMap()
-                }
+                  } catch {
+                    case e: Exception =>
+                      log.error(s"Cannot fetch serialized accumulators for job $jobID", e)
+                      Collections.emptyMap()
+                  }
                 val result = new SerializedJobExecutionResult(
                   jobID,
                   jobInfo.duration,
                   accumulatorResults)
-                jobInfo.client ! decorateMessage(JobResultSuccess(result))
+                  jobInfo.client ! decorateMessage(JobResultSuccess(result))
 
-              case JobStatus.CANCELED =>
+                case JobStatus.CANCELED =>
                 // the error may be packed as a serialized throwable
                 val unpackedError = SerializedThrowable.get(
                   error, executionGraph.getUserClassLoader())
@@ -358,7 +405,7 @@ class JobManager(
                   new SerializedThrowable(
                     new JobCancellationException(jobID, "Job was cancelled.", unpackedError))))
 
-              case JobStatus.FAILED =>
+                case JobStatus.FAILED =>
                 val unpackedError = SerializedThrowable.get(
                   error, executionGraph.getUserClassLoader())
                 
@@ -366,11 +413,12 @@ class JobManager(
                   new SerializedThrowable(
                     new JobExecutionException(jobID, "Job execution failed.", unpackedError))))
 
-              case x =>
+                case x =>
                 val exception = new JobExecutionException(jobID, s"$x is not a terminal state.")
                 jobInfo.client ! decorateMessage(JobResultFailure(
                   new SerializedThrowable(exception)))
-                throw exception
+                  throw exception
+              }
             }
 
             removeJob(jobID)
@@ -463,6 +511,9 @@ class JobManager(
     case RequestBlobManagerPort =>
       sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
 
+    case RequestArchive =>
+      sender ! decorateMessage(ResponseArchive(archive))
+
     case RequestRegisteredTaskManagers =>
       import scala.collection.JavaConverters._
       sender ! decorateMessage(
@@ -486,13 +537,13 @@ class JobManager(
 
     case RequestStackTrace(instanceID) =>
       val gateway = instanceManager.getRegisteredInstanceById(instanceID).getActorGateway
-      gateway.forward(SendStackTrace, new AkkaActorGateway(sender(), leaderSessionID))
+      gateway.forward(SendStackTrace, new AkkaActorGateway(sender, leaderSessionID.orNull))
 
     case Terminated(taskManager) =>
       if (instanceManager.isRegistered(taskManager)) {
         log.info(s"Task manager ${taskManager.path} terminated.")
 
-        instanceManager.unregisterTaskManager(taskManager)
+        instanceManager.unregisterTaskManager(taskManager, true)
         context.unwatch(taskManager)
       }
 
@@ -505,12 +556,12 @@ class JobManager(
       if (instanceManager.isRegistered(taskManager)) {
         log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.")
 
-        instanceManager.unregisterTaskManager(taskManager)
+        instanceManager.unregisterTaskManager(taskManager, false)
         context.unwatch(taskManager)
       }
 
     case RequestLeaderSessionID =>
-      sender() ! ResponseLeaderSessionID(leaderSessionID)
+      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
   }
 
   /**
@@ -519,10 +570,9 @@ class JobManager(
    * graph and the execution vertices are queued for scheduling.
    *
    * @param jobGraph representing the Flink job
-   * @param listenToEvents true if the sender wants to listen to job status and execution state
-   *                       change notifications. false if not.
+   * @param listeningBehaviour specifies the listening behaviour of the sender.
    */
-  private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = {
+  private def submitJob(jobGraph: JobGraph, listeningBehaviour: ListeningBehaviour): Unit = {
     if (jobGraph == null) {
       sender() ! decorateMessage(JobResultFailure(
         new SerializedThrowable(
@@ -560,6 +610,14 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
+        val client = if(listeningBehaviour == ListeningBehaviour.DETACHED) {
+          // The client does not want to receive the SerializedJobExecutionResult
+          ActorRef.noSender
+        } else {
+          // Send the job execution result back to the sender
+          sender
+        }
+
         // see if there already exists an ExecutionGraph for the corresponding job ID
         executionGraph = currentJobs.getOrElseUpdate(
           jobGraph.getJobID,
@@ -571,7 +629,7 @@ class JobManager(
             timeout,
             jobGraph.getUserJarBlobKeys(),
             userCodeLoader),
-            JobInfo(sender(), System.currentTimeMillis())
+            JobInfo(client, System.currentTimeMillis())
           )
         )._1
 
@@ -603,52 +661,53 @@ class JobManager(
               s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.")
           }
 
-          if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
-            vertex.setParallelism(numSlots)
-          }
+              if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+                vertex.setParallelism(numSlots)
+              }
 
-          try {
-            vertex.initializeOnMaster(userCodeLoader)
-          }
-          catch {
+              try {
+                vertex.initializeOnMaster(userCodeLoader)
+              }
+              catch {
             case t: Throwable =>
               throw new JobExecutionException(jobId,
                 "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage, t)
-          }
-        }
+              }
+            }
 
-        // topologically sort the job vertices and attach the graph to the existing one
-        val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
-        if (log.isDebugEnabled) {
-          log.debug(s"Adding ${sortedTopology.size()} vertices from " +
-            s"job graph ${jobId} (${jobName}).")
-        }
-        executionGraph.attachJobGraph(sortedTopology)
+            // topologically sort the job vertices and attach the graph to the existing one
+            val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
+            if (log.isDebugEnabled) {
+              log.debug(s"Adding ${sortedTopology.size()} vertices from " +
+                s"job graph ${jobId} (${jobName}).")
+            }
+            executionGraph.attachJobGraph(sortedTopology)
 
-        if (log.isDebugEnabled) {
-          log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).")
-        }
+            if (log.isDebugEnabled) {
+              log.debug("Successfully created execution graph from job " +
+                s"graph ${jobId} (${jobName}).")
+            }
 
-        // configure the state checkpointing
-        val snapshotSettings = jobGraph.getSnapshotSettings
-        if (snapshotSettings != null) {
+            // configure the state checkpointing
+            val snapshotSettings = jobGraph.getSnapshotSettings
+            if (snapshotSettings != null) {
 
-          val idToVertex: JobVertexID => ExecutionJobVertex = id => {
-            val vertex = executionGraph.getJobVertex(id)
-            if (vertex == null) {
-              throw new JobSubmissionException(jobId,
-                "The snapshot checkpointing settings refer to non-existent vertex " + id)
-            }
-            vertex
-          }
+              val idToVertex: JobVertexID => ExecutionJobVertex = id => {
+                val vertex = executionGraph.getJobVertex(id)
+                if (vertex == null) {
+                  throw new JobSubmissionException(jobId,
+                    "The snapshot checkpointing settings refer to non-existent vertex " + id)
+                }
+                vertex
+              }
 
-          val triggerVertices: java.util.List[ExecutionJobVertex] =
+              val triggerVertices: java.util.List[ExecutionJobVertex] =
             snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
 
-          val ackVertices: java.util.List[ExecutionJobVertex] =
+              val ackVertices: java.util.List[ExecutionJobVertex] =
             snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
 
-          val confirmVertices: java.util.List[ExecutionJobVertex] =
+              val confirmVertices: java.util.List[ExecutionJobVertex] =
             snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
 
           executionGraph.enableSnapshotCheckpointing(
@@ -658,15 +717,16 @@ class JobManager(
             ackVertices,
             confirmVertices,
             context.system,
-            leaderSessionID)
+            leaderSessionID.orNull)
         }
 
         // get notified about job status changes
-        executionGraph.registerJobStatusListener(new AkkaActorGateway(self, leaderSessionID))
+        executionGraph.registerJobStatusListener(
+          new AkkaActorGateway(self, leaderSessionID.orNull))
 
-        if (listenToEvents) {
+        if (listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
           // the sender wants to be notified about state changes
-          val gateway = new AkkaActorGateway(sender(), leaderSessionID)
+          val gateway = new AkkaActorGateway(sender(), leaderSessionID.orNull)
 
           executionGraph.registerExecutionListener(gateway)
           executionGraph.registerJobStatusListener(gateway)
@@ -929,6 +989,53 @@ class JobManager(
     }
   }
 
+  /** Fails all currently running jobs and empties the list of currently running jobs. If the
+    * [[JobClientActor]] waits for a result, then a [[JobExecutionException]] is sent.
+    *
+    * @param cause Cause for the cancelling.
+    */
+  private def cancelAndClearEverything(cause: Throwable) {
+    for((jobID, (eg, jobInfo)) <- currentJobs) {
+      eg.fail(cause)
+
+      if(jobInfo.client != ActorRef.noSender) {
+        jobInfo.client ! decorateMessage(
+          Failure(
+            new JobExecutionException(
+              jobID,
+              "All jobs are cancelled and cleared.",
+              cause)
+          ))
+      }
+    }
+
+    currentJobs.clear()
+  }
+
+  override def grantLeadership(newLeaderSessionID: UUID): Unit = {
+    self ! decorateMessage(GrantLeadership(Option(newLeaderSessionID)))
+  }
+
+  override def revokeLeadership(): Unit = {
+    leaderSessionID = None
+    self ! decorateMessage(RevokeLeadership)
+  }
+
+  override def getAddress: String = {
+    AkkaUtils.getAkkaURL(context.system, self)
+  }
+
+  /** Handles error occuring in the leader election service
+    *
+    * @param exception
+    */
+  override def handleError(exception: Exception): Unit = {
+    log.error("Received an error from the LeaderElectionService.", exception)
+
+    // terminate JobManager in case of an error
+    self ! decorateMessage(PoisonPill)
+  }
+  
   /**
    * Updates the accumulators reported from a task manager via the Heartbeat message.
    * @param accumulators list of accumulator snapshots
@@ -1001,12 +1108,22 @@ object JobManager {
       System.exit(STARTUP_FAILURE_RETURN_CODE)
     }
 
-    // address and will not be reachable from anyone remote
-    if (listeningPort <= 0 || listeningPort >= 65536) {
-      val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
-        "' is invalid, it must be great than 0 and less than 65536."
-      LOG.error(message)
-      System.exit(STARTUP_FAILURE_RETURN_CODE)
+    if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
+      // address and will not be reachable from anyone remote
+      if (listeningPort != 0) {
+        val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+          "' is invalid, it must be equal to 0."
+        LOG.error(message)
+        System.exit(STARTUP_FAILURE_RETURN_CODE)
+      }
+    } else {
+      // address and will not be reachable from anyone remote
+      if (listeningPort <= 0 || listeningPort >= 65536) {
+        val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
+          "' is invalid, it must be greater than 0 and less than 65536."
+        LOG.error(message)
+        System.exit(STARTUP_FAILURE_RETURN_CODE)
+      }
     }
 
     // run the job manager
@@ -1089,7 +1206,7 @@ object JobManager {
     try {
       // bring up the job manager actor
       LOG.info("Starting JobManager actor")
-      val (jobManager, archiver) = startJobManagerActors(
+      val (jobManager, archive) = startJobManagerActors(
         configuration,
         jobManagerSystem,
         streamingMode)
@@ -1114,7 +1231,7 @@ object JobManager {
           jobManagerSystem,
           listeningAddress,
           Some(TaskManager.TASK_MANAGER_NAME),
-          Some(jobManager.path.toString),
+          None,
           true,
           streamingMode,
           classOf[TaskManager])
@@ -1130,9 +1247,15 @@ object JobManager {
       }
 
       if(configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-        val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
-        val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
-        val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+
+        // TODO: Add support for HA. Webserver has to work in dedicated mode. All transferred
+        // information has to be made serializable
+        val address = AkkaUtils.getAddress(jobManagerSystem)
+
+        configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.host.get)
+        configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.port.get)
+
+        val leaderRetrievalService = StandaloneUtils.createLeaderRetrievalService(configuration)
 
         // start the job manager web frontend
         val webServer = if (
@@ -1143,14 +1266,16 @@ object JobManager {
           LOG.info("Starting NEW JobManger web frontend")
           // start the new web frontend. we need to load this dynamically
           // because it is not in the same project/dependencies
-          startWebRuntimeMonitor(configuration, jobManagerGateway, archiverGateway)
+          startWebRuntimeMonitor(configuration, leaderRetrievalService, jobManagerSystem)
         }
         else {
           LOG.info("Starting JobManger web frontend")
-          new WebInfoServer(configuration, jobManagerGateway, archiverGateway)
+          new WebInfoServer(configuration, leaderRetrievalService, jobManagerSystem)
         }
 
-        webServer.start()
+        if(webServer != null) {
+          webServer.start()
+        }
       }
     }
     catch {
@@ -1208,6 +1333,13 @@ object JobManager {
       } text {
         "Network address for communication with the job manager"
       }
+
+      opt[Int]("webui-port").optional().action { (arg, conf) =>
+        conf.setWebUIPort(arg)
+        conf
+      } text {
+        "Port for the UI web server"
+      }
     }
 
     val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
@@ -1232,42 +1364,38 @@ object JobManager {
       configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
     }
 
+    if (config.getWebUIPort() >= 0) {
+      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, config.getWebUIPort())
+    }
+
+    if (config.getHost() != null) {
+      configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, config.getHost())
+    }
+
+    val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
+
     // high availability mode
-    val (hostname: String, port: Int ) = 
-      if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
-        // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is
-        // chosen.  For the FlinkMiniCluster you have to choose it on your own.
+    val port: Int =
+      if (ZooKeeperUtils.isZooKeeperHighAvailabilityEnabled(configuration)) {
         LOG.info("Starting JobManager in High-Availability Mode")
-  
-        if (config.getHost() == null) {
-          throw new Exception("Missing parameter '--host'. Parameter is required when " +
-            "running in high-availability mode")
-        }
-  
-        // Let web server listen on random port
-        configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
-  
-        (config.getHost(), 0)
+
+        configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
+        0
       }
       else {
         LOG.info("Staring JobManager without high-availability")
-        
-        if (config.getHost() != null) {
-          throw new Exception("Found an explicit address for JobManager communication " +
-            "via the CLI option '--host'.\n" +
-            "This parameter must only be set if the JobManager is started in high-availability " +
-            "mode and connects to a ZooKeeper quorum.\n" +
-            "Please configure ZooKeeper or don't set the '--host' option, so that the JobManager " +
-            "uses the address configured under 'conf/flink-conf.yaml'.")
-        }
   
-        val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-        val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+        configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
             ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
-        (host, port)
       }
 
-    (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port)
+    val executionMode = config.getJobManagerMode
+    val streamingMode = config.getStreamingMode
+
+    LOG.info(s"Starting JobManager on $host:$port with execution mode $executionMode and " +
+      s"streaming mode $streamingMode")
+
+    (configuration, executionMode, streamingMode, host, port)
   }
 
   /**
@@ -1278,9 +1406,17 @@ object JobManager {
    * @param configuration The configuration from which to parse the config values.
    * @return The members for a default JobManager.
    */
-  def createJobManagerComponents(configuration: Configuration)
-    : (ExecutionContext, InstanceManager, FlinkScheduler, BlobLibraryCacheManager,
-      Props, Int, Long, FiniteDuration, Int) = {
+  def createJobManagerComponents(configuration: Configuration) :
+    (ExecutionContext,
+    InstanceManager,
+    FlinkScheduler,
+    BlobLibraryCacheManager,
+    Props,
+    Int, // execution retries
+    Long, // delay between retries
+    FiniteDuration, // timeout
+    Int, // number of archived jobs
+    LeaderElectionService) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
@@ -1346,6 +1482,8 @@ object JobManager {
       }
     }
 
+    val leaderElectionService = LeaderElectionUtils.createLeaderElectionService(configuration)
+
     (executionContext,
       instanceManager,
       scheduler,
@@ -1353,8 +1491,9 @@ object JobManager {
       archiveProps,
       executionRetries,
       delayBetweenRetries,
-      timeout,
-      archiveCount)
+      timeout, 
+      archiveCount, 
+      leaderElectionService)
   }
 
   /**
@@ -1386,7 +1525,7 @@ object JobManager {
    * @param actorSystem The actor system running the JobManager
    * @param jobMangerActorName Optionally the name of the JobManager actor. If none is given,
    *                          the actor will have the name generated by the actor system.
-   * @param archiverActorName Optionally the name of the archive actor. If none is given,
+   * @param archiveActorName Optionally the name of the archive actor. If none is given,
    *                          the actor will have the name generated by the actor system.
    * @param streamingMode The mode to run the system in (streaming vs. batch-only)
    * 
@@ -1396,7 +1535,7 @@ object JobManager {
       configuration: Configuration,
       actorSystem: ActorSystem,
       jobMangerActorName: Option[String],
-      archiverActorName: Option[String],
+      archiveActorName: Option[String],
       streamingMode: StreamingMode)
     : (ActorRef, ActorRef) = {
 
@@ -1408,10 +1547,11 @@ object JobManager {
       executionRetries,
       delayBetweenRetries,
       timeout,
-      _) = createJobManagerComponents(configuration)
+      _,
+      leaderElectionService) = createJobManagerComponents(configuration)
 
     // start the archiver with the given name, or without (avoid name conflicts)
-    val archiver: ActorRef = archiverActorName match {
+    val archive: ActorRef = archiveActorName match {
       case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
       case None => actorSystem.actorOf(archiveProps)
     }
@@ -1423,18 +1563,19 @@ object JobManager {
       instanceManager,
       scheduler,
       libraryCacheManager,
-      archiver,
+      archive,
       executionRetries,
       delayBetweenRetries,
       timeout,
-      streamingMode)
+      streamingMode,
+      leaderElectionService)
 
     val jobManager: ActorRef = jobMangerActorName match {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
       case None => actorSystem.actorOf(jobManagerProps)
     }
 
-    (jobManager, archiver)
+    (jobManager, archive)
   }
 
   def startActor(props: Props, actorSystem: ActorSystem): ActorRef = {
@@ -1452,9 +1593,13 @@ object JobManager {
    * @param address The address of the JobManager's actor system.
    * @return The akka URL of the JobManager actor.
    */
-  def getRemoteJobManagerAkkaURL(address: InetSocketAddress): String = {
+  def getRemoteJobManagerAkkaURL(
+      address: InetSocketAddress,
+      name: Option[String] = None)
+    : String = {
     val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
-    s"akka.tcp://flink@$hostPort/user/$JOB_MANAGER_NAME"
+
+    getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name)
   }
 
   /**
@@ -1463,17 +1608,24 @@ object JobManager {
    *
    * @return The local akka URL of the JobManager actor.
    */
-  def getLocalJobManagerAkkaURL: String = {
-    "akka://flink/user/" + JOB_MANAGER_NAME
+  def getLocalJobManagerAkkaURL(name: Option[String] = None): String = {
+    getJobManagerAkkaURLHelper("akka://flink", name)
+  }
+
+  def getJobManagerAkkaURL(system: ActorSystem, name: Option[String] = None): String = {
+    getJobManagerAkkaURLHelper(AkkaUtils.getAddress(system).toString, name)
   }
 
-  def getJobManagerRemoteReferenceFuture(
+  private def getJobManagerAkkaURLHelper(address: String, name: Option[String]): String = {
+    address + "/user/" + name.getOrElse(JOB_MANAGER_NAME)
+  }
+
+  def getJobManagerActorRefFuture(
       address: InetSocketAddress,
       system: ActorSystem,
       timeout: FiniteDuration)
     : Future[ActorRef] = {
-
-    AkkaUtils.getReference(getRemoteJobManagerAkkaURL(address), system, timeout)
+    AkkaUtils.getActorRefFuture(getRemoteJobManagerAkkaURL(address), system, timeout)
   }
 
   /**
@@ -1486,25 +1638,12 @@ object JobManager {
    * @return The ActorRef to the JobManager
    */
   @throws(classOf[IOException])
-  def getJobManagerRemoteReference(
+  def getJobManagerActorRef(
       jobManagerUrl: String,
       system: ActorSystem,
       timeout: FiniteDuration)
     : ActorRef = {
-
-    try {
-      val future = AkkaUtils.getReference(jobManagerUrl, system, timeout)
-      Await.result(future, timeout)
-    }
-    catch {
-      case e @ (_ : ActorNotFound | _ : TimeoutException) =>
-        throw new IOException(
-          s"JobManager at $jobManagerUrl not reachable. " +
-            s"Please make sure that the JobManager is running and its port is reachable.", e)
-
-      case e: IOException =>
-        throw new IOException("Could not connect to JobManager at " + jobManagerUrl, e)
-    }
+    AkkaUtils.getActorRef(jobManagerUrl, system, timeout)
   }
 
   /**
@@ -1517,14 +1656,14 @@ object JobManager {
    * @return The ActorRef to the JobManager
    */
   @throws(classOf[IOException])
-  def getJobManagerRemoteReference(
+  def getJobManagerActorRef(
       address: InetSocketAddress,
       system: ActorSystem,
       timeout: FiniteDuration)
     : ActorRef = {
 
     val jmAddress = getRemoteJobManagerAkkaURL(address)
-    getJobManagerRemoteReference(jmAddress, system, timeout)
+    getJobManagerActorRef(jmAddress, system, timeout)
   }
 
   /**
@@ -1537,37 +1676,16 @@ object JobManager {
    * @return The ActorRef to the JobManager
    */
   @throws(classOf[IOException])
-  def getJobManagerRemoteReference(
+  def getJobManagerActorRef(
       address: InetSocketAddress,
       system: ActorSystem,
       config: Configuration)
     : ActorRef = {
 
     val timeout = AkkaUtils.getLookupTimeout(config)
-    getJobManagerRemoteReference(address, system, timeout)
+    getJobManagerActorRef(address, system, timeout)
   }
 
-  /** Returns the [[ActorGateway]] for the provided JobManager. The function automatically
-    * retrieves the current leader session ID from the JobManager and instantiates the
-    * [[AkkaActorGateway]] with it.
-    *
-    * @param jobManager ActorRef to the [[JobManager]]
-    * @param timeout Timeout for the blocking leader session ID retrieval
-    * @throws java.lang.Exception
-    * @return Gateway to the specified JobManager
-    */
-  @throws(classOf[Exception])
-  def getJobManagerGateway(
-    jobManager: ActorRef,
-    timeout: FiniteDuration
-    ): ActorGateway = {
-    val futureLeaderSessionID = (jobManager ? RequestLeaderSessionID)(timeout)
-      .mapTo[ResponseLeaderSessionID]
-
-    val leaderSessionID = Await.result(futureLeaderSessionID, timeout).leaderSessionID
-
-    new AkkaActorGateway(jobManager, leaderSessionID)
-  }
 
   // --------------------------------------------------------------------------
   //  Utilities
@@ -1581,29 +1699,28 @@ object JobManager {
    * this method does not throw any exceptions, but only logs them.
    * 
    * @param config The configuration for the runtime monitor.
-   * @param jobManager The JobManager actor gateway.
-   * @param archiver The execution graph archive actor.
+   * @param leaderRetrievalService Leader retrieval service to get the leading JobManager
    */
   def startWebRuntimeMonitor(
       config: Configuration,
-      jobManager: ActorGateway,
-      archiver: ActorGateway)
+      leaderRetrievalService: LeaderRetrievalService,
+      actorSystem: ActorSystem)
     : WebMonitor = {
     // try to load and instantiate the class
     try {
       val classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"
       val clazz: Class[_ <: WebMonitor] = Class.forName(classname)
-                                               .asSubclass(classOf[WebMonitor])
+        .asSubclass(classOf[WebMonitor])
 
       val ctor: Constructor[_ <: WebMonitor] = clazz.getConstructor(classOf[Configuration],
-                                                                    classOf[ActorGateway],
-                                                                    classOf[ActorGateway])
-      ctor.newInstance(config, jobManager, archiver)
+        classOf[LeaderRetrievalService],
+        classOf[ActorSystem])
+      ctor.newInstance(config, leaderRetrievalService, actorSystem)
     }
     catch {
       case e: ClassNotFoundException =>
         LOG.error("Could not load web runtime monitor. " +
-            "Probably reason: flink-runtime-web is not in the classpath")
+          "Probably reason: flink-runtime-web is not in the classpath")
         LOG.debug("Caught exception", e)
         null
       case e: InvocationTargetException =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index e2891de..702e34b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -71,6 +71,10 @@ class MemoryArchivist(private val max_entries: Int)
   var canceledCnt: Int = 0
   var failedCnt: Int = 0
 
+  override def preStart(): Unit = {
+    log.info(s"Started memory archivist ${self.path}")
+  }
+
   override def handleMessage: Receive = {
     
     /* Receive Execution Graph to archive */

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index 83bafaa..2369d3c 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.messages
 
 import java.text.SimpleDateFormat
-import java.util.Date
+import java.util.{UUID, Date}
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.execution.ExecutionState

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 1c250af..d7bbb8d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.messages
 
 import java.util.UUID
 
+import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.client.{SerializedJobExecutionResult, JobStatusMessage}
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
@@ -35,17 +37,32 @@ import scala.collection.JavaConverters._
  */
 object JobManagerMessages {
 
-  case class LeaderSessionMessage(leaderSessionID: Option[UUID], message: Any)
+  /** Wrapper class for leader session messages. Leader session messages implement the
+    * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],
+    * which also contains the current leader session ID.
+    *
+    * @param leaderSessionID Current leader session ID or null, if no leader session ID was set
+    * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]
+    */
+  case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)
 
   /**
-   * Submits a job to the job manager. If [[registerForEvents]] is true,
-   * then the sender will be registered as listener for the state change messages.
+   * Submits a job to the job manager. Depending on the [[listeningBehaviour]],
+   * the sender registers for different messages. If [[ListeningBehaviour.DETACHED]], then
+   * it will only be informed whether the submission was successful or not. If
+   * [[ListeningBehaviour.EXECUTION_RESULT]], then it will additionally receive the execution
+   * result. If [[ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES]], then it will additionally
+   * receive the job status change notifications.
+   *
    * The submission result will be sent back to the sender as a success message.
    *
    * @param jobGraph The job to be submitted to the JobManager
-   * @param registerForEvents if true, then register for state change events
+   * @param listeningBehaviour Specifies to what the sender wants to listen (detached, execution
+   *                           result, execution result and state changes)
    */
-  case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean)
+  case class SubmitJob(
+      jobGraph: JobGraph,
+      listeningBehaviour: ListeningBehaviour)
     extends RequiresLeaderSessionID
 
   /**
@@ -161,7 +178,7 @@ object JobManagerMessages {
     *
     * @param leaderSessionID
     */
-  case class ResponseLeaderSessionID(leaderSessionID: Option[UUID])
+  case class ResponseLeaderSessionID(leaderSessionID: UUID)
 
   /**
    * Denotes a successful job submission.
@@ -299,6 +316,22 @@ object JobManagerMessages {
 
   case object JobManagerStatusAlive extends JobManagerStatus
 
+  /** Grants leadership to the receiver. The message contains the new leader session id.
+    *
+     * @param leaderSessionID
+    */
+  case class GrantLeadership(leaderSessionID: Option[UUID])
+
+  /** Revokes leadership of the receiver.
+    */
+  case object RevokeLeadership
+
+  /** Requests the ActorRef of the archiver */
+  case object RequestArchive
+
+  /** Response containing the ActorRef of the archiver */
+  case class ResponseArchive(actor: ActorRef)
+
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
@@ -338,4 +371,8 @@ object JobManagerMessages {
   def getRequestLeaderSessionID: AnyRef = {
     RequestLeaderSessionID
   }
+
+  def getRequestArchive: AnyRef = {
+    RequestArchive
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
index b435ebc..941d63f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/RegistrationMessages.scala
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.messages
 
-import java.util.UUID
-
-import akka.actor.ActorRef
 import org.apache.flink.runtime.instance.{InstanceConnectionInfo, InstanceID, HardwareDescription}
 
 import scala.concurrent.duration.{Deadline, FiniteDuration}
@@ -34,21 +31,18 @@ object RegistrationMessages {
   /**
    * Marker trait for registration messages.
    */
-  trait RegistrationMessage {
-    def registrationSessionID: UUID
-  }
+  trait RegistrationMessage extends RequiresLeaderSessionID {}
 
   /**
    * Triggers the TaskManager to attempt a registration at the JobManager.
    *
-   * @param jobManagerAkkaURL The actor URL of the JobManager.
+   * @param jobManagerURL Akka URL to the JobManager
    * @param timeout The timeout for the message. The next retry will double this timeout.
    * @param deadline Optional deadline until when the registration must be completed.
    * @param attempt The attempt number, for logging.
    */
   case class TriggerTaskManagerRegistration(
-      registrationSessionID: UUID,
-      jobManagerAkkaURL: String,
+      jobManagerURL: String,
       timeout: FiniteDuration,
       deadline: Option[Deadline],
       attempt: Int)
@@ -58,14 +52,11 @@ object RegistrationMessages {
    * Registers a task manager at the job manager. A successful registration is acknowledged by
    * [[AcknowledgeRegistration]].
    *
-   * @param taskManager The TaskManager actor.
    * @param connectionInfo The TaskManagers connection information.
    * @param resources The TaskManagers resources.
    * @param numberOfSlots The number of processing slots offered by the TaskManager.
    */
   case class RegisterTaskManager(
-      registrationSessionID: UUID,
-      taskManager: ActorRef,
       connectionInfo: InstanceConnectionInfo,
       resources: HardwareDescription,
       numberOfSlots: Int)
@@ -80,9 +71,6 @@ object RegistrationMessages {
    * @param blobPort The server port where the JobManager's BLOB service runs.
    */
   case class AcknowledgeRegistration(
-      registrationSessionID: UUID,
-      leaderSessionID: UUID,
-      jobManager: ActorRef,
       instanceID: InstanceID,
       blobPort: Int)
     extends RegistrationMessage
@@ -94,9 +82,6 @@ object RegistrationMessages {
    * @param blobPort The server port where the JobManager's BLOB service runs.
    */
   case class AlreadyRegistered(
-      registrationSessionID: UUID,
-      leaderSessionID: UUID,
-      jobManager: ActorRef,
       instanceID: InstanceID,
       blobPort: Int)
     extends RegistrationMessage
@@ -107,6 +92,6 @@ object RegistrationMessages {
    *
    * @param reason Reason why the task manager registration was refused
    */
-  case class RefuseRegistration(registrationSessionID: UUID, reason: String)
+  case class RefuseRegistration(reason: String)
     extends RegistrationMessage
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
new file mode 100644
index 0000000..a80ca99
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages
+
+import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
+import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
+import org.apache.flink.runtime.taskmanager.TaskExecutionState
+
+/**
+ * A set of messages that control the deployment and the state of Tasks executed
+ * on the TaskManager.
+ */
+object TaskMessages {
+
+  /**
+   * Marker trait for task messages.
+   */
+  trait TaskMessage
+
+  // --------------------------------------------------------------------------
+  //  Starting and stopping Tasks
+  // --------------------------------------------------------------------------
+
+  /**
+   * Submits a task to the task manager. The result is to this message is a
+   * [[TaskOperationResult]] message.
+   *
+   * @param tasks Descriptor which contains the information to start the task.
+   */
+  case class SubmitTask(tasks: TaskDeploymentDescriptor)
+    extends TaskMessage with RequiresLeaderSessionID
+
+  /**
+   * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
+   * [[TaskOperationResult]] message.
+   *
+   * @param attemptID The task's execution attempt ID.
+   */
+  case class CancelTask(attemptID: ExecutionAttemptID)
+    extends TaskMessage with RequiresLeaderSessionID
+
+  /**
+   * Triggers a fail of specified task from the outside (as opposed to the task throwing
+   * an exception itself) with the given exception as the cause.
+   *
+   * @param executionID The task's execution attempt ID.
+   * @param cause The reason for the external failure.
+   */
+  case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
+    extends TaskMessage
+
+  /**
+   * Notifies the TaskManager that the task has reached its final state,
+   * either FINISHED, CANCELED, or FAILED.
+   *
+   * @param executionID The task's execution attempt ID.
+   */
+  case class TaskInFinalState(executionID: ExecutionAttemptID)
+    extends TaskMessage
+
+
+  // --------------------------------------------------------------------------
+  //  Updates to Intermediate Results
+  // --------------------------------------------------------------------------
+
+  /**
+   * Answer to a [[RequestPartitionState]] with the state of the respective partition.
+   */
+  case class PartitionState(
+      taskExecutionId: ExecutionAttemptID,
+      taskResultId: IntermediateDataSetID,
+      partitionId: IntermediateResultPartitionID,
+      state: ExecutionState)
+    extends TaskMessage with RequiresLeaderSessionID
+
+  /**
+   * Base class for messages that update the information about location of input partitions
+   */
+  abstract sealed class UpdatePartitionInfo extends TaskMessage with RequiresLeaderSessionID {
+    def executionID: ExecutionAttemptID
+  }
+
+  /**
+   *
+   * @param executionID The task's execution attempt ID.
+   * @param resultId The input reader to update.
+   * @param partitionInfo The partition info update.
+   */
+  case class UpdateTaskSinglePartitionInfo(
+      executionID: ExecutionAttemptID,
+      resultId: IntermediateDataSetID,
+      partitionInfo: InputChannelDeploymentDescriptor)
+    extends UpdatePartitionInfo
+
+  /**
+   *
+   * @param executionID The task's execution attempt ID.
+   * @param partitionInfos List of input gates with channel descriptors to update.
+   */
+  case class UpdateTaskMultiplePartitionInfos(
+      executionID: ExecutionAttemptID,
+      partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
+    extends UpdatePartitionInfo
+
+  /**
+   * Fails (and releases) all intermediate result partitions identified by
+   * [[executionID]] from the task manager.
+   *
+   * @param executionID The task's execution attempt ID.
+   */
+  case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
+    extends TaskMessage with RequiresLeaderSessionID
+
+
+  // --------------------------------------------------------------------------
+  //  Report Messages
+  // --------------------------------------------------------------------------
+
+  /**
+   * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
+   * boolean value which is sent back to the sender.
+   *
+   * @param taskExecutionState The changed task state
+   */
+  case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
+    extends TaskMessage with RequiresLeaderSessionID
+
+  /**
+   * Response message to updates in the task state. Send for example as a response to
+   *
+   *  - [[SubmitTask]]
+   *  - [[CancelTask]]
+   *
+   * @param executionID identifying the respective task
+   * @param success indicating whether the operation has been successful
+   * @param description Optional description for unsuccessful results.
+   */
+  case class TaskOperationResult(
+      executionID: ExecutionAttemptID,
+      success: Boolean,
+      description: String)
+    extends TaskMessage {
+    def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
+  }
+
+
+  // --------------------------------------------------------------------------
+  //  Utility Functions
+  // --------------------------------------------------------------------------
+
+  def createUpdateTaskMultiplePartitionInfos(
+      executionID: ExecutionAttemptID,
+      resultIDs: java.util.List[IntermediateDataSetID],
+      partitionInfos: java.util.List[InputChannelDeploymentDescriptor])
+    : UpdateTaskMultiplePartitionInfos = {
+
+    require(resultIDs.size() == partitionInfos.size(),
+      "ResultIDs must have the same length as partitionInfos.")
+
+    import scala.collection.JavaConverters.asScalaBufferConverter
+
+    new UpdateTaskMultiplePartitionInfos(executionID,
+      resultIDs.asScala.zip(partitionInfos.asScala))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 6cb571c..30c82fe 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.messages
 
+import java.util.UUID
+
+import akka.actor.ActorRef
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.instance.InstanceID
 
@@ -92,17 +95,25 @@ object TaskManagerMessages {
 
   /**
    * Requests a notification from the task manager as soon as the task manager has been
-   * registered at the job manager. Once the task manager is registered at the job manager a
+   * registered at any job manager. Once the task manager is registered at any job manager a
    * [[RegisteredAtJobManager]] message will be sent to the sender.
    */
-  case object NotifyWhenRegisteredAtJobManager
+  case object NotifyWhenRegisteredAtAnyJobManager
 
   /**
-   * Acknowledges that the task manager has been successfully registered at the job manager. This
-   * message is a response to [[NotifyWhenRegisteredAtJobManager]].
+   * Acknowledges that the task manager has been successfully registered at any job manager. This
+   * message is a response to [[NotifyWhenRegisteredAtAnyJobManager]].
    */
   case object RegisteredAtJobManager
 
+  /** Tells the address of the new leading [[org.apache.flink.runtime.jobmanager.JobManager]]
+    * and the new leader session ID.
+    *
+    * @param jobManagerAddress Address of the new leading JobManager
+    * @param leaderSessionID New leader session ID
+    */
+  case class JobManagerLeaderAddress(jobManagerAddress: String, leaderSessionID: UUID)
+
 
   // --------------------------------------------------------------------------
   //  Utility getters for case objects to simplify access from Java
@@ -113,7 +124,7 @@ object TaskManagerMessages {
    * @return The NotifyWhenRegisteredAtJobManager case object instance.
    */
   def getNotifyWhenRegisteredAtJobManagerMessage:
-            NotifyWhenRegisteredAtJobManager.type = NotifyWhenRegisteredAtJobManager
+            NotifyWhenRegisteredAtAnyJobManager.type = NotifyWhenRegisteredAtAnyJobManager
 
   /**
    * Accessor for the case object instance, to simplify Java interoperability.

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
deleted file mode 100644
index a80ca99..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskMessages.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, TaskDeploymentDescriptor}
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, IntermediateResultPartitionID}
-import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
-import org.apache.flink.runtime.taskmanager.TaskExecutionState
-
-/**
- * A set of messages that control the deployment and the state of Tasks executed
- * on the TaskManager.
- */
-object TaskMessages {
-
-  /**
-   * Marker trait for task messages.
-   */
-  trait TaskMessage
-
-  // --------------------------------------------------------------------------
-  //  Starting and stopping Tasks
-  // --------------------------------------------------------------------------
-
-  /**
-   * Submits a task to the task manager. The result is to this message is a
-   * [[TaskOperationResult]] message.
-   *
-   * @param tasks Descriptor which contains the information to start the task.
-   */
-  case class SubmitTask(tasks: TaskDeploymentDescriptor)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  /**
-   * Cancels the task associated with [[attemptID]]. The result is sent back to the sender as a
-   * [[TaskOperationResult]] message.
-   *
-   * @param attemptID The task's execution attempt ID.
-   */
-  case class CancelTask(attemptID: ExecutionAttemptID)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  /**
-   * Triggers a fail of specified task from the outside (as opposed to the task throwing
-   * an exception itself) with the given exception as the cause.
-   *
-   * @param executionID The task's execution attempt ID.
-   * @param cause The reason for the external failure.
-   */
-  case class FailTask(executionID: ExecutionAttemptID, cause: Throwable)
-    extends TaskMessage
-
-  /**
-   * Notifies the TaskManager that the task has reached its final state,
-   * either FINISHED, CANCELED, or FAILED.
-   *
-   * @param executionID The task's execution attempt ID.
-   */
-  case class TaskInFinalState(executionID: ExecutionAttemptID)
-    extends TaskMessage
-
-
-  // --------------------------------------------------------------------------
-  //  Updates to Intermediate Results
-  // --------------------------------------------------------------------------
-
-  /**
-   * Answer to a [[RequestPartitionState]] with the state of the respective partition.
-   */
-  case class PartitionState(
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID,
-      partitionId: IntermediateResultPartitionID,
-      state: ExecutionState)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  /**
-   * Base class for messages that update the information about location of input partitions
-   */
-  abstract sealed class UpdatePartitionInfo extends TaskMessage with RequiresLeaderSessionID {
-    def executionID: ExecutionAttemptID
-  }
-
-  /**
-   *
-   * @param executionID The task's execution attempt ID.
-   * @param resultId The input reader to update.
-   * @param partitionInfo The partition info update.
-   */
-  case class UpdateTaskSinglePartitionInfo(
-      executionID: ExecutionAttemptID,
-      resultId: IntermediateDataSetID,
-      partitionInfo: InputChannelDeploymentDescriptor)
-    extends UpdatePartitionInfo
-
-  /**
-   *
-   * @param executionID The task's execution attempt ID.
-   * @param partitionInfos List of input gates with channel descriptors to update.
-   */
-  case class UpdateTaskMultiplePartitionInfos(
-      executionID: ExecutionAttemptID,
-      partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)])
-    extends UpdatePartitionInfo
-
-  /**
-   * Fails (and releases) all intermediate result partitions identified by
-   * [[executionID]] from the task manager.
-   *
-   * @param executionID The task's execution attempt ID.
-   */
-  case class FailIntermediateResultPartitions(executionID: ExecutionAttemptID)
-    extends TaskMessage with RequiresLeaderSessionID
-
-
-  // --------------------------------------------------------------------------
-  //  Report Messages
-  // --------------------------------------------------------------------------
-
-  /**
-   * Denotes a state change of a task at the JobManager. The update success is acknowledged by a
-   * boolean value which is sent back to the sender.
-   *
-   * @param taskExecutionState The changed task state
-   */
-  case class UpdateTaskExecutionState(taskExecutionState: TaskExecutionState)
-    extends TaskMessage with RequiresLeaderSessionID
-
-  /**
-   * Response message to updates in the task state. Send for example as a response to
-   *
-   *  - [[SubmitTask]]
-   *  - [[CancelTask]]
-   *
-   * @param executionID identifying the respective task
-   * @param success indicating whether the operation has been successful
-   * @param description Optional description for unsuccessful results.
-   */
-  case class TaskOperationResult(
-      executionID: ExecutionAttemptID,
-      success: Boolean,
-      description: String)
-    extends TaskMessage {
-    def this(executionID: ExecutionAttemptID, success: Boolean) = this(executionID, success, "")
-  }
-
-
-  // --------------------------------------------------------------------------
-  //  Utility Functions
-  // --------------------------------------------------------------------------
-
-  def createUpdateTaskMultiplePartitionInfos(
-      executionID: ExecutionAttemptID,
-      resultIDs: java.util.List[IntermediateDataSetID],
-      partitionInfos: java.util.List[InputChannelDeploymentDescriptor])
-    : UpdateTaskMultiplePartitionInfos = {
-
-    require(resultIDs.size() == partitionInfos.size(),
-      "ResultIDs must have the same length as partitionInfos.")
-
-    import scala.collection.JavaConverters.asScalaBufferConverter
-
-    new UpdateTaskMultiplePartitionInfos(executionID,
-      resultIDs.asScala.zip(partitionInfos.asScala))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c4c35f8..bbd011a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
+import java.util.UUID
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
@@ -26,22 +27,25 @@ import akka.actor.{ActorRef, ActorSystem}
 
 import com.typesafe.config.Config
 
-import org.apache.flink.api.common.{JobExecutionResult, JobSubmissionResult}
+import org.apache.flink.api.common.{JobID, JobExecutionResult, JobSubmissionResult}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.{JobExecutionException, JobClient}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
-import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.jobmanager.{JobManager, RecoveryMode}
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalService, LeaderRetrievalListener,
+StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtAnyJobManager
+import org.apache.flink.runtime.util.{StandaloneUtils, ZooKeeperUtils}
 import org.apache.flink.runtime.webmonitor.WebMonitor
 
 import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{ExecutionContext, Future, Await}
+import scala.concurrent._
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -50,15 +54,16 @@ import scala.concurrent.{ExecutionContext, Future, Await}
  * actors can all be run in the same [[ActorSystem]] or each one in its own.
  *
  * @param userConfiguration Configuration object with the user provided configuration values
- * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
- *                          [[ActorSystem]], otherwise false
+ * @param useSingleActorSystem true if all actors (JobManager and TaskManager) shall be run in the
+ *                             same [[ActorSystem]], otherwise false
  * @param streamingMode True, if the system should be started in streaming mode, false if
  *                      in pure batch mode.
  */
 abstract class FlinkMiniCluster(
     val userConfiguration: Configuration,
-    val singleActorSystem: Boolean,
-    val streamingMode: StreamingMode) {
+    val useSingleActorSystem: Boolean,
+    val streamingMode: StreamingMode)
+  extends LeaderRetrievalListener {
 
   def this(userConfiguration: Configuration, singleActorSystem: Boolean) 
          = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
@@ -73,55 +78,110 @@ abstract class FlinkMiniCluster(
   // not getLocalHost(), which may be 127.0.1.1
   val hostname = InetAddress.getByName("localhost").getHostAddress()
 
-  val timeout = AkkaUtils.getTimeout(userConfiguration)
-
   val configuration = generateConfiguration(userConfiguration)
 
-  var jobManagerActorSystem = startJobManagerActorSystem()
-  var (jobManagerActor, webMonitor) = startJobManager(jobManagerActorSystem)
+  /** Future to the [[ActorGateway]] of the current leader */
+  var leaderGateway: Promise[ActorGateway] = Promise()
 
-  val numTaskManagers = configuration.getInteger(
-     ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+  /** Future to the index of the current leader */
+  var leaderIndex: Promise[Int] = Promise()
 
-  var (taskManagerActorSystems, taskManagerActors) =
-    (for(i <- 0 until numTaskManagers) yield {
-      val actorSystem = if(singleActorSystem) {
-        jobManagerActorSystem
-      } else {
-        startTaskManagerActorSystem(i)
-      }
+  /** Future lock */
+  val futureLock = new Object()
+
+  implicit val executionContext = ExecutionContext.global
 
-      (actorSystem, startTaskManager(i, actorSystem))
-    }).unzip
+  implicit val timeout = AkkaUtils.getTimeout(userConfiguration)
+
+  val recoveryMode = RecoveryMode.valueOf(configuration.getString(
+    ConfigConstants.RECOVERY_MODE,
+    ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase)
+
+  val numJobManagers = getNumberOfJobManagers
+
+  val numTaskManagers = configuration.getInteger(
+    ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
+    ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
 
-  waitForTaskManagersToBeRegistered()
+  var jobManagerActorSystems: Option[Seq[ActorSystem]] = None
+  var jobManagerActors: Option[Seq[ActorRef]] = None
+  var webMonitor: Option[WebMonitor] = None
+  var taskManagerActorSystems: Option[Seq[ActorSystem]] = None
+  var taskManagerActors: Option[Seq[ActorRef]] = None
 
+  protected var leaderRetrievalService: Option[LeaderRetrievalService] = None
 
   // --------------------------------------------------------------------------
-  //                           Construction
+  //                           Abstract Methods
   // --------------------------------------------------------------------------
 
   def generateConfiguration(userConfiguration: Configuration): Configuration
 
-  def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor])
+  def startJobManager(index: Int, system: ActorSystem): ActorRef
 
   def startTaskManager(index: Int, system: ActorSystem): ActorRef
 
-  def getJobManagerAkkaConfig: Config = {
-    if (singleActorSystem) {
+  // --------------------------------------------------------------------------
+  //                           Getters/Setters
+  // --------------------------------------------------------------------------
+
+  def getNumberOfJobManagers: Int = {
+    if(recoveryMode == RecoveryMode.STANDALONE) {
+      1
+    } else {
+      configuration.getInteger(
+        ConfigConstants.LOCAL_NUMBER_JOB_MANAGER,
+        ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER
+      )
+    }
+  }
+
+  def getJobManagersAsJava = {
+    import collection.JavaConverters._
+    jobManagerActors.getOrElse(Seq()).asJava
+  }
+
+  def getTaskManagers = {
+    taskManagerActors.getOrElse(Seq())
+  }
+
+  def getTaskManagersAsJava = {
+    import collection.JavaConverters._
+    taskManagerActors.getOrElse(Seq()).asJava
+  }
+
+  def getLeaderGatewayFuture: Future[ActorGateway] = {
+    leaderGateway.future
+  }
+
+  def getLeaderGateway(timeout: FiniteDuration): ActorGateway = {
+    val jmFuture = getLeaderGatewayFuture
+
+    Await.result(jmFuture, timeout)
+  }
+
+  def getLeaderIndexFuture: Future[Int] = {
+    leaderIndex.future
+  }
+
+  def getLeaderIndex(timeout: FiniteDuration): Int = {
+    val indexFuture = getLeaderIndexFuture
+
+    Await.result(indexFuture, timeout)
+  }
+
+  def getJobManagerAkkaConfig(index: Int): Config = {
+    if (useSingleActorSystem) {
       AkkaUtils.getAkkaConfig(configuration, None)
     }
     else {
       val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
         ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-      AkkaUtils.getAkkaConfig(configuration, Some((hostname, port)))
-    }
-  }
+      val resolvedPort = if(port != 0) port + index else port
 
-  def startJobManagerActorSystem(): ActorSystem = {
-    val config = getJobManagerAkkaConfig
-    AkkaUtils.createActorSystem(config)
+      AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
+    }
   }
 
   def getTaskManagerAkkaConfig(index: Int): Config = {
@@ -133,77 +193,95 @@ abstract class FlinkMiniCluster(
     AkkaUtils.getAkkaConfig(configuration, Some((hostname, resolvedPort)))
   }
 
+  // --------------------------------------------------------------------------
+  //                          Start/Stop Methods
+  // --------------------------------------------------------------------------
+
+  def startJobManagerActorSystem(index: Int): ActorSystem = {
+    val config = getJobManagerAkkaConfig(index)
+    AkkaUtils.createActorSystem(config)
+  }
+
   def startTaskManagerActorSystem(index: Int): ActorSystem = {
     val config = getTaskManagerAkkaConfig(index)
 
     AkkaUtils.createActorSystem(config)
   }
 
-  def getJobManagerGateway(): ActorGateway = {
-    // create ActorGateway from the JobManager's ActorRef
-    JobManager.getJobManagerGateway(jobManagerActor, timeout)
+  def startJobClientActorSystem(jobID: JobID): ActorSystem = {
+    if (useSingleActorSystem) {
+      jobManagerActorSystems match {
+        case Some(jmActorSystems) => jmActorSystems(0)
+        case None => throw new JobExecutionException(
+          jobID,
+          "The FlinkMiniCluster has not been started yet.")
+      }
+    } else {
+      JobClient.startJobClientActorSystem(configuration)
+    }
   }
 
-  def getTaskManagers = {
-    taskManagerActors
+  def start(): Unit = {
+    start(true)
   }
 
-  def getTaskManagersAsJava = {
-    import collection.JavaConverters._
-    taskManagerActors.asJava
-  }
+  def start(waitForTaskManagerRegistration: Boolean): Unit = {
+    LOG.info("Starting FlinkMiniCluster.")
 
-  def stop(): Unit = {
-    LOG.info("Stopping FlinkMiniCluster.")
-    shutdown()
-    awaitTermination()
-  }
+    lazy val singleActorSystem = startJobManagerActorSystem(0)
 
-  def shutdown(): Unit = {
-    webMonitor foreach {
-      _.stop()
-    }
+    val (jmActorSystems, jmActors) =
+      (for(i <- 0 until numJobManagers) yield {
+        val actorSystem = if(useSingleActorSystem) {
+          singleActorSystem
+        } else {
+          startJobManagerActorSystem(i)
+        }
+        (actorSystem, startJobManager(i, actorSystem))
+      }).unzip
 
-    val futures = taskManagerActors map {
-        gracefulStop(_, timeout)
-    }
+    jobManagerActorSystems = Some(jmActorSystems)
+    jobManagerActors = Some(jmActors)
 
-    val future = gracefulStop(jobManagerActor, timeout)
+    val lrs = createLeaderRetrievalService();
 
-    implicit val executionContext = ExecutionContext.global
+    leaderRetrievalService = Some(lrs)
+    lrs.start(this)
 
-    Await.ready(Future.sequence(future +: futures), timeout)
+    val (tmActorSystems, tmActors) =
+      (for(i <- 0 until numTaskManagers) yield {
+        val actorSystem = if(useSingleActorSystem) {
+          jmActorSystems(0)
+        } else {
+          startTaskManagerActorSystem(i)
+        }
 
-    if(!singleActorSystem){
-      taskManagerActorSystems foreach {
-        _.shutdown()
-      }
-    }
+        (actorSystem, startTaskManager(i, actorSystem))
+      }).unzip
 
-    jobManagerActorSystem.shutdown()
-  }
+    taskManagerActorSystems = Some(tmActorSystems)
+    taskManagerActors = Some(tmActors)
 
-  def awaitTermination(): Unit = {
-    jobManagerActorSystem.awaitTermination()
+    val jobManagerAkkaURL = AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0))
 
-    taskManagerActorSystems foreach {
-      _.awaitTermination()
+    webMonitor = startWebServer(configuration, jmActorSystems(0), jobManagerAkkaURL)
+
+    if(waitForTaskManagerRegistration) {
+      waitForTaskManagersToBeRegistered()
     }
   }
 
   def startWebServer(
       config: Configuration,
-      jobManager: ActorRef,
-      archiver: ActorRef)
+      actorSystem: ActorSystem,
+      jobManagerAkkaURL: String)
     : Option[WebMonitor] = {
     if(
-      config.getBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, false) &&
-      config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-
-      val lookupTimeout = AkkaUtils.getLookupTimeout(config)
+      config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
+        config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 
-      val jobManagerGateway = JobManager.getJobManagerGateway(jobManager, lookupTimeout)
-      val archiverGateway = new AkkaActorGateway(archiver, jobManagerGateway.leaderSessionID())
+      // TODO: Add support for HA: Make web server work independently from the JM
+      val leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManagerAkkaURL)
 
       // start the job manager web frontend
       val webServer = if (
@@ -214,11 +292,10 @@ abstract class FlinkMiniCluster(
         LOG.info("Starting NEW JobManger web frontend")
         // start the new web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
-        JobManager.startWebRuntimeMonitor(config, jobManagerGateway, archiverGateway)
-      }
-      else {
+        JobManager.startWebRuntimeMonitor(config, leaderRetrievalService, actorSystem)
+      } else {
         LOG.info("Starting JobManger web frontend")
-        new WebInfoServer(config, jobManagerGateway, archiverGateway)
+        new WebInfoServer(config, leaderRetrievalService, actorSystem)
       }
 
       webServer.start()
@@ -229,18 +306,89 @@ abstract class FlinkMiniCluster(
     }
   }
 
-  def waitForTaskManagersToBeRegistered(): Unit = {
+  def stop(): Unit = {
+    LOG.info("Stopping FlinkMiniCluster.")
+    shutdown()
+    awaitTermination()
+
+    leaderRetrievalService.foreach(_.stop())
+  }
+
+  protected def shutdown(): Unit = {
+    webMonitor foreach {
+      _.stop()
+    }
+    
+    val tmFutures = taskManagerActors map {
+      _.map(gracefulStop(_, timeout))
+    } getOrElse(Seq())
+
+
+    val jmFutures = jobManagerActors map {
+      _.map(gracefulStop(_, timeout))
+    } getOrElse(Seq())
+
     implicit val executionContext = ExecutionContext.global
 
-    val futures = taskManagerActors map {
-      taskManager => (taskManager ? NotifyWhenRegisteredAtJobManager)(timeout)
+    Await.ready(Future.sequence(jmFutures ++ tmFutures), timeout)
+
+    if (!useSingleActorSystem) {
+      taskManagerActorSystems foreach {
+        _ foreach(_.shutdown())
+      }
     }
 
+    jobManagerActorSystems foreach {
+      _ foreach(_.shutdown())
+    }
+  }
+
+  def awaitTermination(): Unit = {
+    jobManagerActorSystems foreach {
+      _ foreach(_.awaitTermination())
+    }
+
+    taskManagerActorSystems foreach {
+      _ foreach(_.awaitTermination())
+    }
+  }
+
+  // --------------------------------------------------------------------------
+  //                          Utility Methods
+  // --------------------------------------------------------------------------
+
+  /** Waits with the default timeout until all task managers have registered at the job manager
+    *
+    * @throws java.util.concurrent.TimeoutException
+    * @throws java.lang.InterruptedException
+    */
+  @throws(classOf[TimeoutException])
+  @throws(classOf[InterruptedException])
+  def waitForTaskManagersToBeRegistered(): Unit = {
+    waitForTaskManagersToBeRegistered(timeout)
+  }
+
+  /** Waits until all task managers have registered at the job manager until the timeout is reached.
+    *
+    * @param timeout
+    * @throws java.util.concurrent.TimeoutException
+    * @throws java.lang.InterruptedException
+    */
+  @throws(classOf[TimeoutException])
+  @throws(classOf[InterruptedException])
+  def waitForTaskManagersToBeRegistered(timeout: FiniteDuration): Unit = {
+    val futures = taskManagerActors map {
+      _ map(taskManager => (taskManager ? NotifyWhenRegisteredAtAnyJobManager)(timeout))
+    } getOrElse(Seq())
+
     Await.ready(Future.sequence(futures), timeout)
   }
 
   @throws(classOf[JobExecutionException])
-  def submitJobAndWait(jobGraph: JobGraph, printUpdates: Boolean): JobExecutionResult = {
+  def submitJobAndWait(
+      jobGraph: JobGraph,
+      printUpdates: Boolean)
+    : JobExecutionResult = {
     submitJobAndWait(jobGraph, printUpdates, timeout)
   }
   
@@ -251,25 +399,126 @@ abstract class FlinkMiniCluster(
       timeout: FiniteDuration)
     : JobExecutionResult = {
 
-    val clientActorSystem = if (singleActorSystem) jobManagerActorSystem
-    else JobClient.startJobClientActorSystem(configuration)
-
-    JobClient.submitJobAndWait(
-      clientActorSystem,
-      getJobManagerGateway(),
-      jobGraph,
-      timeout,
+    val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)
+
+     try {
+       val jobManagerGateway = try {
+           getLeaderGateway(timeout)
+         } catch {
+           case e: Exception => throw new JobExecutionException(
+             jobGraph.getJobID,
+             "Could not retrieve leading job manager gateway.",
+             e)
+         }
+
+     JobClient.submitJobAndWait(
+       clientActorSystem,
+       jobManagerGateway,
+       jobGraph,
+       timeout,
       printUpdates,
       this.getClass().getClassLoader())
+    } finally {
+       if(!useSingleActorSystem) {
+         // we have to shutdown the just created actor system
+         shutdownJobClientActorSystem(clientActorSystem)
+       }
+     }
   }
 
   @throws(classOf[JobExecutionException])
   def submitJobDetached(jobGraph: JobGraph) : JobSubmissionResult = {
-    JobClient.submitJobDetached(
-      getJobManagerGateway(),
-      jobGraph,
-      timeout,
-      getClass().getClassLoader())
+
+    val jobManagerGateway = try {
+      getLeaderGateway(timeout)
+    } catch {
+      case t: Throwable =>
+        throw new JobExecutionException(
+          jobGraph.getJobID,
+          "Could not retrieve JobManager ActorRef.",
+          t
+        )
+    }
+
+    JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, getClass().getClassLoader())
+
     new JobSubmissionResult(jobGraph.getJobID)
   }
+
+  def shutdownJobClientActorSystem(actorSystem: ActorSystem): Unit = {
+    if(!useSingleActorSystem) {
+      actorSystem.shutdown()
+    }
+  }
+
+  protected def createLeaderRetrievalService(): LeaderRetrievalService = {
+    (jobManagerActorSystems, jobManagerActors) match {
+      case (Some(jmActorSystems), Some(jmActors)) =>
+        if (recoveryMode == RecoveryMode.STANDALONE) {
+          new StandaloneLeaderRetrievalService(
+            AkkaUtils.getAkkaURL(jmActorSystems(0), jmActors(0)))
+        } else {
+          ZooKeeperUtils.createLeaderRetrievalService(configuration)
+        }
+
+      case _ => throw new Exception("The FlinkMiniCluster has not been started properly.")
+    }
+  }
+
+  protected def clearLeader(): Unit = {
+    futureLock.synchronized{
+      leaderGateway = Promise()
+      leaderIndex = Promise()
+    }
+  }
+
+  override def notifyLeaderAddress(address: String, leaderSessionID: UUID): Unit = {
+    if (address != null && !address.equals("")) {
+      // only accept leader addresses which are not null and non-empty
+
+      val selectedLeader = (jobManagerActorSystems, jobManagerActors) match {
+        case (Some(systems), Some(actors)) =>
+          val actorPaths = systems.zip(actors).zipWithIndex.map {
+            case ((system, actor), index) => (AkkaUtils.getAkkaURL(system, actor), actor, index)
+          }
+
+          actorPaths.find {
+            case (path, actor, index) => path.equals(address)
+          }.map(x => (x._2, x._3))
+        case _ => None
+      }
+
+      futureLock.synchronized {
+        if (leaderGateway.isCompleted) {
+          // assignments happen atomically and only here
+          leaderGateway = Promise()
+          leaderIndex = Promise()
+        }
+
+        selectedLeader match {
+          case Some((leader, index)) =>
+            leaderGateway.success(new AkkaActorGateway(leader, leaderSessionID))
+            leaderIndex.success(index)
+          case None =>
+            leaderGateway.failure(
+              new Exception(s"Could not find job manager with address ${address}."))
+            leaderIndex.failure(
+              new Exception(s"Could not find job manager index with address ${address}.")
+            )
+        }
+      }
+    }
+  }
+
+  override def handleError(exception: Exception): Unit = {
+    futureLock.synchronized {
+      if(leaderGateway.isCompleted) {
+        leaderGateway = Promise.failed(exception)
+        leaderIndex = Promise.failed(exception)
+      } else{
+        leaderGateway.failure(exception)
+        leaderIndex.failure(exception)
+      }
+    }
+  }
 }