You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/08/31 12:31:42 UTC

[06/10] flink git commit: [FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 54c457e..3f8783a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -18,32 +18,29 @@
 
 package org.apache.flink.runtime.minicluster
 
-import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
+import akka.actor.{ActorRef, ActorSystem}
 
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.client.JobClient
-import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
-import org.apache.flink.runtime.webmonitor.WebMonitor
 
 import org.slf4j.LoggerFactory
 
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the [[JobManager]] in the same
- * JVM. It extends the [[FlinkMiniCluster]] by providing a [[JobClient]], having convenience
- * functions to setup Flink's configuration and implementations to create [[JobManager]] and
- * [[TaskManager]].
+ * JVM. It extends the [[FlinkMiniCluster]] by having convenience functions to setup Flink's
+ * configuration and implementations to create [[JobManager]] and [[TaskManager]].
  *
  * @param userConfiguration Configuration object with the user provided configuration values
  * @param singleActorSystem true if all actors (JobManager and TaskManager) shall be run in the same
  *                          [[ActorSystem]], otherwise false
+ * @param streamingMode Defines the execution mode of Flink's components (JobManager and
+ *                      TaskManager)
  */
 class LocalFlinkMiniCluster(
     userConfiguration: Configuration,
@@ -51,22 +48,12 @@ class LocalFlinkMiniCluster(
     streamingMode: StreamingMode)
   extends FlinkMiniCluster(userConfiguration, singleActorSystem, streamingMode) {
 
-  
   def this(userConfiguration: Configuration, singleActorSystem: Boolean)
        = this(userConfiguration, singleActorSystem, StreamingMode.BATCH_ONLY)
   
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
   // --------------------------------------------------------------------------
-  
-  
-  val jobClientActorSystem = if (singleActorSystem) {
-    jobManagerActorSystem
-  } else {
-    // create an actor system listening on a random port
-    JobClient.startJobClientActorSystem(configuration)
-  }
-
 
   override def generateConfiguration(userConfiguration: Configuration): Configuration = {
     val config = getDefaultConfig
@@ -78,14 +65,28 @@ class LocalFlinkMiniCluster(
     config
   }
 
-  override def startJobManager(system: ActorSystem): (ActorRef, Option[WebMonitor]) = {
+  override def startJobManager(index: Int, system: ActorSystem): ActorRef = {
     val config = configuration.clone()
-       
-    val (jobManager, archiver) = JobManager.startJobManagerActors(config, system, streamingMode)
 
-    val webMonitorOption = startWebServer(config, jobManager, archiver)
+    val jobManagerName = getJobManagerName(index)
+    val archiveName = getArchiveName(index)
+
+    val jobManagerPort = config.getInteger(
+      ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+
+    if(jobManagerPort > 0) {
+      config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
+    }
 
-    (jobManager, webMonitorOption)
+    val (jobManager, _) = JobManager.startJobManagerActors(
+      config,
+      system,
+      Some(jobManagerName),
+      Some(archiveName),
+      streamingMode)
+
+    jobManager
   }
 
   override def startTaskManager(index: Int, system: ActorSystem): ActorRef = {
@@ -113,51 +114,32 @@ class LocalFlinkMiniCluster(
     } else {
       TaskManager.TASK_MANAGER_NAME
     }
-
-    val jobManagerPath: Option[String] = if (singleActorSystem) {
-      Some(jobManagerActor.path.toString)
-    } else {
-      None
-    }
     
     TaskManager.startTaskManagerComponentsAndActor(
       config,
       system,
       hostname, // network interface to bind to
       Some(taskManagerActorName), // actor name
-      jobManagerPath, // job manager akka URL
+      Some(createLeaderRetrievalService), // job manager leader retrieval service
       localExecution, // start network stack?
       streamingMode,
       classOf[TaskManager])
   }
 
-  def getJobClientActorSystem: ActorSystem = jobClientActorSystem
-
-  def getJobManagerRPCPort: Int = {
-    if (jobManagerActorSystem.isInstanceOf[ExtendedActorSystem]) {
-      val extActor = jobManagerActorSystem.asInstanceOf[ExtendedActorSystem]
-      extActor.provider.getDefaultAddress.port match {
-        case p: Some[Int] => p.get
-        case _ => -1
-      }
-    } else {
-      -1
-    }
-  }
+  def getLeaderRPCPort: Int = {
+    val index = getLeaderIndex(timeout)
 
-  override def shutdown(): Unit = {
-    super.shutdown()
+    jobManagerActorSystems match {
+      case Some(jmActorSystems) =>
+        AkkaUtils.getAddress(jmActorSystems(index)).port match {
+          case Some(p) => p
+          case None => -1
+        }
 
-    if (!singleActorSystem) {
-      jobClientActorSystem.shutdown()
+      case None => throw new Exception("The JobManager of the LocalFlinkMiniCluster has not been " +
+        "started properly.")
     }
-  }
 
-  override def awaitTermination(): Unit = {
-    if (!singleActorSystem) {
-      jobClientActorSystem.awaitTermination()
-    }
-    super.awaitTermination()
   }
 
   def initializeIOFormatClasses(configuration: Configuration): Unit = {
@@ -178,10 +160,10 @@ class LocalFlinkMiniCluster(
     if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
 
       val bufferSizeNew: Int = config.getInteger(
-                                      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
+        ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1)
 
       val bufferSizeOld: Int = config.getInteger(
-                                      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
+        ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
       val bufferSize: Int =
         if (bufferSizeNew != -1) {
           bufferSizeNew
@@ -194,13 +176,15 @@ class LocalFlinkMiniCluster(
           bufferSizeOld
         }
       
-      val bufferMem: Long = config.getLong(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-          ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
+      val bufferMem: Long = config.getLong(
+        ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+        ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
 
       val numTaskManager = config.getInteger(
-        ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+        ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
-      val memoryFraction = config.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+      val memoryFraction = config.getFloat(
+        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
         ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
 
       // full memory size
@@ -218,16 +202,12 @@ class LocalFlinkMiniCluster(
     }
   }
 
-  def getConfiguration: Configuration = {
-    this.userConfiguration
-  }
-
   def getDefaultConfig: Configuration = {
     val config: Configuration = new Configuration()
 
     config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, hostname)
 
-    config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
+    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
 
     // Reduce number of threads for local execution
     config.setInteger(NettyConfig.NUM_THREADS_CLIENT, 1)
@@ -235,8 +215,19 @@ class LocalFlinkMiniCluster(
 
     config
   }
-}
 
-object LocalFlinkMiniCluster {
-//  val LOG = LoggerFactory.getLogger(classOf[LocalFlinkMiniCluster])
+  protected def getJobManagerName(index: Int): String = {
+    if(singleActorSystem) {
+      JobManager.JOB_MANAGER_NAME + "_" + (index + 1)
+    } else {
+      JobManager.JOB_MANAGER_NAME
+    }
+  }
+  protected def getArchiveName(index: Int): String = {
+    if(singleActorSystem) {
+      JobManager.ARCHIVE_NAME + "_" + (index + 1)
+    } else {
+      JobManager.ARCHIVE_NAME
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index cc8b8ba..529d3d1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -39,8 +39,10 @@ import grizzled.slf4j.Logger
 import org.apache.flink.configuration._
 
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
+import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessages, LogMessages, StreamingMode}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages, StreamingMode}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
@@ -55,17 +57,15 @@ import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
-import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager}
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
-import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
-import org.apache.flink.runtime.util.{ZooKeeperUtil, MathUtils, EnvironmentInformation}
+import org.apache.flink.runtime.util.{LeaderRetrievalUtils, MathUtils, EnvironmentInformation}
 
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -120,15 +120,15 @@ import scala.language.postfixOps
 class TaskManager(
     protected val config: TaskManagerConfiguration,
     protected val connectionInfo: InstanceConnectionInfo,
-    protected val jobManagerAkkaURL: String,
     protected val memoryManager: MemoryManager,
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
-    protected val numberOfSlots: Int)
+    protected val numberOfSlots: Int,
+    protected val leaderRetrievalService: LeaderRetrievalService)
   extends FlinkActor
-  with LeaderSessionMessages // Mixin order is important: second we want to filter leader messages
+  with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging
   with LogMessages // Mixin order is important: first we want to support message logging
-{
+  with LeaderRetrievalListener {
 
   override val log = Logger(getClass)
 
@@ -152,10 +152,12 @@ class TaskManager(
 
   /** Metric serialization */
   private val metricRegistryMapper: ObjectMapper = new ObjectMapper()
-        .registerModule(new MetricsModule(TimeUnit.SECONDS,
-                                          TimeUnit.MILLISECONDS,
-                                          false,
-                                          MetricFilter.ALL))
+    .registerModule(
+      new MetricsModule(
+        TimeUnit.SECONDS,
+        TimeUnit.MILLISECONDS,
+        false,
+        MetricFilter.ALL))
 
   /** Actors which want to be notified once this task manager has been
       registered at the job manager */
@@ -164,19 +166,18 @@ class TaskManager(
   private var blobService: Option[BlobService] = None
   private var libraryCacheManager: Option[LibraryCacheManager] = None
   protected var currentJobManager: Option[ActorRef] = None
-
+  private var jobManagerAkkaURL: Option[String] = None
+ 
   private var instanceID: InstanceID = null
 
   private var heartbeatScheduler: Option[Cancellable] = None
 
-  protected var leaderSessionID: Option[UUID] = None
+  var leaderSessionID: Option[UUID] = None
 
-  private val currentRegistrationSessionID: UUID = UUID.randomUUID()
 
   private val runtimeInfo = new TaskManagerRuntimeInfo(
        connectionInfo.getHostname(),
        new UnmodifiableConfiguration(config.configuration))
-
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -196,18 +197,13 @@ class TaskManager(
       log.info(MemoryLogger.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
     }
 
-    // kick off the registration
-    val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
-
-    self ! decorateMessage(
-      TriggerTaskManagerRegistration(
-        currentRegistrationSessionID,
-        jobManagerAkkaURL,
-        TaskManager.INITIAL_REGISTRATION_TIMEOUT,
-        deadline,
-        1)
-    )
-
+    try {
+      leaderRetrievalService.start(this)
+    } catch {
+      case e: Exception =>
+        log.error("Could not start leader retrieval service.", e)
+        throw new RuntimeException("Could not start leader retrieval service.", e)
+    }
   }
 
   /**
@@ -229,6 +225,12 @@ class TaskManager(
     }
 
     try {
+      leaderRetrievalService.stop()
+    } catch {
+      case e: Exception => log.error("Leader retrieval service did not shut down properly.")
+    }
+
+    try {
       ioManager.shutdown()
     } catch {
       case t: Exception => log.error("I/O manager did not shutdown properly.", t)
@@ -266,6 +268,10 @@ class TaskManager(
     // messages for coordinating checkpoints
     case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)
 
+    case JobManagerLeaderAddress(address, leaderSessionID) => {
+      handleJobManagerLeaderAddress(address, leaderSessionID)
+    }
+
     // registration messages for connecting and disconnecting from / to the JobManager
     case message: RegistrationMessage => handleRegistrationMessage(message)
 
@@ -279,7 +285,7 @@ class TaskManager(
 
     // registers the message sender to be notified once this TaskManager has completed
     // its registration at the JobManager
-    case NotifyWhenRegisteredAtJobManager =>
+    case NotifyWhenRegisteredAtAnyJobManager =>
       if (isConnected) {
         sender ! decorateMessage(RegisteredAtJobManager)
       } else {
@@ -290,6 +296,7 @@ class TaskManager(
     case Terminated(actor: ActorRef) =>
       if (isConnected && actor == currentJobManager.orNull) {
         handleJobManagerDisconnect(sender(), "JobManager is no longer reachable")
+        triggerTaskManagerRegistration()
       }
       else {
         log.warn(s"Received unrecognized disconnect message " +
@@ -298,6 +305,7 @@ class TaskManager(
 
     case Disconnect(msg) =>
       handleJobManagerDisconnect(sender(), "JobManager requested disconnect: " + msg)
+      triggerTaskManagerRegistration()
 
     case FatalError(message, cause) =>
       killTaskManagerFatal(message, cause)
@@ -361,7 +369,7 @@ class TaskManager(
         // was into a terminal state, or in case the JobManager cannot be informed of the
         // state transition
 
-      case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
+      case updateMsg @ UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) =>
 
           // we receive these from our tasks and forward them to the JobManager
           currentJobManager foreach {
@@ -379,7 +387,7 @@ class TaskManager(
                     FailTask(
                       executionID,
                       new Exception("Task has been cancelled on the JobManager."))
-                  )
+                    )
                   }
 
                 case Failure(t) =>
@@ -416,15 +424,15 @@ class TaskManager(
           val task = runningTasks.get(executionID)
           if (task != null) {
             task.cancelExecution()
-          sender ! decorateMessage(new TaskOperationResult(executionID, true))
+            sender ! decorateMessage(new TaskOperationResult(executionID, true))
           } else {
             log.debug(s"Cannot find task to cancel for execution ${executionID})")
-          sender ! decorateMessage(
-            new TaskOperationResult(
-              executionID,
-              false,
-            "No task with that execution ID was found.")
-          )
+            sender ! decorateMessage(
+              new TaskOperationResult(
+                executionID,
+                false,
+              "No task with that execution ID was found.")
+            )
           }
 
         case PartitionState(taskExecutionId, taskResultId, partitionId, state) =>
@@ -489,148 +497,148 @@ class TaskManager(
    * @param message The registration message.
    */
   private def handleRegistrationMessage(message: RegistrationMessage): Unit = {
-    if(message.registrationSessionID.equals(currentRegistrationSessionID)) {
-      message match {
-        case TriggerTaskManagerRegistration(
-          registrationSessionID,
-          jobManagerURL,
-          timeout,
-          deadline,
-          attempt) =>
+    message match {
+      case TriggerTaskManagerRegistration(
+        jobManagerURL,
+        timeout,
+        deadline,
+        attempt) =>
 
-          if (isConnected) {
-            // this may be the case, if we queue another attempt and
-            // in the meantime, the registration is acknowledged
-            log.debug(
-              "TaskManager was triggered to register at JobManager, but is already registered")
-          }
-          else if (deadline.exists(_.isOverdue())) {
-            // we failed to register in time. that means we should quit
-            log.error("Failed to register at the JobManager withing the defined maximum " +
-              "connect time. Shutting down ...")
+        if (isConnected) {
+          // this may be the case, if we queue another attempt and
+          // in the meantime, the registration is acknowledged
+          log.debug(
+            "TaskManager was triggered to register at JobManager, but is already registered")
+        }
+        else if (deadline.exists(_.isOverdue())) {
+          // we failed to register in time. that means we should quit
+          log.error("Failed to register at the JobManager withing the defined maximum " +
+            "connect time. Shutting down ...")
 
-            // terminate ourselves (hasta la vista)
-            self ! decorateMessage(PoisonPill)
+          // terminate ourselves (hasta la vista)
+          self ! decorateMessage(PoisonPill)
+        }
+        else {
+          if (!jobManagerAkkaURL.equals(Option(jobManagerURL))) {
+            throw new Exception("Invalid internal state: Trying to register at JobManager " +
+              s"${jobManagerURL} even though the current JobManagerAkkaURL is set to " +
+              s"${jobManagerAkkaURL.getOrElse("")}")
           }
-          else {
-            log.info(s"Trying to register at JobManager ${jobManagerURL} " +
-              s"(attempt ${attempt}, timeout: ${timeout})")
-
-            val jobManager = context.actorSelection(jobManagerAkkaURL)
-            jobManager ! decorateMessage(
-              RegisterTaskManager(
-                registrationSessionID,
-                self,
-                connectionInfo,
-                resources,
-                numberOfSlots)
-            )
 
-            // the next timeout computes via exponential backoff with cap
-            val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+          log.info(s"Trying to register at JobManager ${jobManagerURL} " +
+            s"(attempt ${attempt}, timeout: ${timeout})")
 
-            // schedule (with our timeout s delay) a check triggers a new registration
-            // attempt, if we are not registered by then
-            context.system.scheduler.scheduleOnce(timeout) {
-              if (!isConnected) {
-                self ! decorateMessage(
-                  TriggerTaskManagerRegistration(
-                    registrationSessionID,
-                    jobManagerURL,
-                    nextTimeout,
-                    deadline,
-                    attempt + 1)
-                )
-              }
-            }(context.dispatcher)
-          }
+          val jobManager = context.actorSelection(jobManagerURL)
 
-        // successful registration. associate with the JobManager
-        // we disambiguate duplicate or erroneous messages, to simplify debugging
-        case AcknowledgeRegistration(_, leaderSessionID, jobManager, id, blobPort) =>
-          if (isConnected) {
-            if (jobManager == currentJobManager.orNull) {
-              log.debug("Ignoring duplicate registration acknowledgement.")
-            } else {
-              log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
-                s"because the TaskManager is already registered at ${currentJobManager.orNull}")
-            }
+          jobManager ! decorateMessage(
+            RegisterTaskManager(
+              connectionInfo,
+              resources,
+              numberOfSlots)
+          )
+
+          // the next timeout computes via exponential backoff with cap
+          val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT)
+
+          // schedule (with our timeout s delay) a check triggers a new registration
+          // attempt, if we are not registered by then
+          context.system.scheduler.scheduleOnce(
+            timeout,
+            self,
+            decorateMessage(TriggerTaskManagerRegistration(
+              jobManagerURL,
+              nextTimeout,
+              deadline,
+              attempt + 1)
+            ))(context.dispatcher)
+        }
+
+      // successful registration. associate with the JobManager
+      // we disambiguate duplicate or erroneous messages, to simplify debugging
+      case AcknowledgeRegistration(id, blobPort) =>
+        val jobManager = sender()
+
+        if (isConnected) {
+          if (jobManager == currentJobManager.orNull) {
+            log.debug("Ignoring duplicate registration acknowledgement.")
+          } else {
+            log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " +
+              s"because the TaskManager is already registered at ${currentJobManager.orNull}")
           }
-          else {
-            // not yet connected, so let's associate with that JobManager
-            try {
-              associateWithJobManager(jobManager, id, blobPort, leaderSessionID)
-            } catch {
-              case t: Throwable =>
-                killTaskManagerFatal(
-                  "Unable to start TaskManager components after registering at JobManager", t)
-            }
+        }
+        else {
+          // not yet connected, so let's associate with that JobManager
+          try {
+            associateWithJobManager(jobManager, id, blobPort)
+          } catch {
+            case t: Throwable =>
+              killTaskManagerFatal(
+                "Unable to start TaskManager components after registering at JobManager", t)
           }
+        }
 
-        // we are already registered at that specific JobManager - duplicate answer, rare cases
-        case AlreadyRegistered(_, leaderSesssionID, jobManager, id, blobPort) =>
-          if (isConnected) {
-            if (jobManager == currentJobManager.orNull) {
-              log.debug("Ignoring duplicate registration acknowledgement.")
-            } else {
-              log.warn(s"Received 'AlreadyRegistered' message from " +
-                s"JobManager ${jobManager.path}, even through TaskManager is currently " +
-                s"registered at ${currentJobManager.orNull}")
-            }
-          }
-          else {
-            // not connected, yet, to let's associate
-            log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+      // we are already registered at that specific JobManager - duplicate answer, rare cases
+      case AlreadyRegistered(id, blobPort) =>
+        val jobManager = sender()
 
-            try {
-              associateWithJobManager(jobManager, id, blobPort, leaderSesssionID)
-            } catch {
-              case t: Throwable =>
-                killTaskManagerFatal(
-                  "Unable to start TaskManager components after registering at JobManager", t)
-            }
+        if (isConnected) {
+          if (jobManager == currentJobManager.orNull) {
+            log.debug("Ignoring duplicate registration acknowledgement.")
+          } else {
+            log.warn(s"Received 'AlreadyRegistered' message from " +
+              s"JobManager ${jobManager.path}, even through TaskManager is currently " +
+              s"registered at ${currentJobManager.orNull}")
           }
+        }
+        else {
+          // not connected, yet, to let's associate
+          log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'")
+
+          try {
+            associateWithJobManager(jobManager, id, blobPort)
+          } catch {
+            case t: Throwable =>
+              killTaskManagerFatal(
+                "Unable to start TaskManager components after registering at JobManager", t)
+          }
+        }
 
-        case RefuseRegistration(registrationSessionID, reason) =>
-          if (currentJobManager.isEmpty) {
-            log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
-              s"because: ${reason}. Retrying later...")
-
-            // try the registration again after some time
+      case RefuseRegistration(reason) =>
+        if (currentJobManager.isEmpty) {
+          log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " +
+            s"because: ${reason}. Retrying later...")
 
-            val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
-            val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
-              timeout => timeout + delay fromNow
-            }
+        if(jobManagerAkkaURL.isDefined) {
+          // try the registration again after some time
+          val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION
+          val deadline: Option[Deadline] = config.maxRegistrationDuration.map {
+            timeout => timeout + delay fromNow
+          }
 
-            context.system.scheduler.scheduleOnce(delay) {
-              self ! decorateMessage(
-                TriggerTaskManagerRegistration(
-                  registrationSessionID,
-                  jobManagerAkkaURL,
-                  TaskManager.INITIAL_REGISTRATION_TIMEOUT,
-                  deadline,
-                  1)
-              )
-            }(context.dispatcher)
+          context.system.scheduler.scheduleOnce(delay) {
+            self ! decorateMessage(
+              TriggerTaskManagerRegistration(
+                jobManagerAkkaURL.get,
+                TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+                deadline,
+                1)
+            )
+          }(context.dispatcher)
+        }
+      }
+        else {
+          // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
+          if (sender() == currentJobManager.orNull) {
+            log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
+              s" even though this TaskManager is already registered there.")
           }
           else {
-            // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration
-            if (sender() == currentJobManager.orNull) {
-              log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" +
-                s" even though this TaskManager is already registered there.")
-            }
-            else {
-              log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
-                s"JobManager (${sender().path})")
-            }
+            log.warn(s"Ignoring 'RefuseRegistration' from unrelated " +
+              s"JobManager (${sender().path})")
           }
+        }
 
-        case _ => unhandled(message)
-      }
-    } else {
-      log.debug(s"Discarded registration message ${message}, because the registration session " +
-        "ID was not correct.")
+      case _ => unhandled(message)
     }
   }
 
@@ -644,7 +652,7 @@ class TaskManager(
    *
    * @return True, if the TaskManager is currently connected to a JobManager, false otherwise.
    */
-  private def isConnected : Boolean = currentJobManager.isDefined
+  protected def isConnected : Boolean = currentJobManager.isDefined
 
   /**
    * Associates the TaskManager with the given JobManager. After this
@@ -655,13 +663,11 @@ class TaskManager(
    * @param id The instanceID under which the TaskManager is registered
    *           at the JobManager.
    * @param blobPort The JobManager's port for the BLOB server.
-   * @param newLeaderSessionID Leader session ID of the JobManager
    */
   private def associateWithJobManager(
       jobManager: ActorRef,
       id: InstanceID,
-      blobPort: Int,
-      newLeaderSessionID: UUID)
+      blobPort: Int)
     : Unit = {
 
     if (jobManager == null) {
@@ -673,10 +679,6 @@ class TaskManager(
     if (blobPort <= 0 || blobPort > 65535) {
       throw new IllegalArgumentException("blob port is out of range: " + blobPort)
     }
-    if(newLeaderSessionID == null) {
-      throw new NullPointerException("Leader session ID must not be null.")
-    }
-
 
     // sanity check that we are not currently registered with a different JobManager
     if (isConnected) {
@@ -703,13 +705,12 @@ class TaskManager(
 
     currentJobManager = Some(jobManager)
     instanceID = id
-    leaderSessionID = Some(newLeaderSessionID)
 
     // start the network stack, now that we have the JobManager actor reference
     try {
       network.associateWithTaskManagerAndJobManager(
-        new AkkaActorGateway(jobManager, leaderSessionID),
-        new AkkaActorGateway(self, leaderSessionID)
+        new AkkaActorGateway(jobManager, leaderSessionID.orNull),
+        new AkkaActorGateway(self, leaderSessionID.orNull)
       )
 
 
@@ -791,7 +792,7 @@ class TaskManager(
 
     // de-register from the JobManager (faster detection of disconnect)
     currentJobManager foreach {
-      _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is shutting down."))
+      _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is disassociating"))
     }
 
     currentJobManager = None
@@ -812,13 +813,14 @@ class TaskManager(
     network.disassociate()
   }
 
-  private def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = {
+  protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = {
     if (isConnected && jobManager != null) {
 
       // check if it comes from our JobManager
       if (jobManager == currentJobManager.orNull) {
         try {
-          val message = "Disconnecting from JobManager: " + msg
+          val message = s"TaskManager ${self.path} disconnects from JobManager " +
+            s"${jobManager.path}: " + msg
           log.info(message)
 
           // cancel all our tasks with a proper error message
@@ -826,17 +828,6 @@ class TaskManager(
 
           // reset our state to disassociated
           disassociateFromJobManager()
-
-          // begin attempts to reconnect
-          val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
-          self ! decorateMessage(
-            TriggerTaskManagerRegistration(
-              currentRegistrationSessionID,
-              jobManagerAkkaURL,
-              TaskManager.INITIAL_REGISTRATION_TIMEOUT,
-              deadline,
-              1)
-          )
         }
         catch {
           // this is pretty bad, it leaves the TaskManager in a state where it cannot
@@ -883,8 +874,8 @@ class TaskManager(
       // create the task. this does not grab any TaskManager resources or download
       // and libraries - the operation does not block
 
-      val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID)
-      val selfGateway = new AkkaActorGateway(self, leaderSessionID)
+      val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID.orNull)
+      val selfGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
 
       val task = new Task(
         tdd,
@@ -1108,6 +1099,58 @@ class TaskManager(
 
     self ! decorateMessage(Kill)
   }
+
+  override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
+    self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
+  }
+
+  /** Handles the notification about a new leader and its address. If the TaskManager is still
+    * connected to another JobManager, it first disconnects from it. If the new JobManager
+    * address is not null, then it starts the registration process.
+    *
+    * @param newJobManagerAkkaURL
+    * @param leaderSessionID
+    */
+  private def handleJobManagerLeaderAddress(
+      newJobManagerAkkaURL: String,
+      leaderSessionID: UUID)
+    : Unit = {
+
+    currentJobManager match {
+      case Some(jm) =>
+        handleJobManagerDisconnect(jm, s"JobManager ${newJobManagerAkkaURL} was elected as leader.")
+      case None =>
+    }
+
+    this.jobManagerAkkaURL = Option(newJobManagerAkkaURL)
+    this.leaderSessionID = Option(leaderSessionID)
+
+    triggerTaskManagerRegistration()
+  }
+
+  /** Starts the TaskManager's registration process to connect to the JobManager.
+    *
+    */
+  def triggerTaskManagerRegistration(): Unit = {
+    if(jobManagerAkkaURL.isDefined) {
+      // begin attempts to reconnect
+      val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow)
+
+      self ! decorateMessage(
+        TriggerTaskManagerRegistration(
+          jobManagerAkkaURL.get,
+          TaskManager.INITIAL_REGISTRATION_TIMEOUT,
+          deadline,
+          1)
+      )
+    }
+  }
+
+  override def handleError(exception: Exception): Unit = {
+    log.error("Error in leader retrieval service", exception)
+
+    self ! decorateMessage(PoisonPill)
+  }
 }
 
 /**
@@ -1278,12 +1321,7 @@ object TaskManager {
       taskManagerClass: Class[_ <: TaskManager])
     : Unit = {
 
-    val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
-
-    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(
-      configuration,
-      jobManagerHostname,
-      jobManagerPort)
+    val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort(configuration)
 
     runTaskManager(
       taskManagerHostname,
@@ -1296,9 +1334,7 @@ object TaskManager {
   @throws(classOf[IOException])
   @throws(classOf[IllegalConfigurationException])
   def selectNetworkInterfaceAndPort(
-      configuration: Configuration,
-      jobManagerHostname: String,
-      jobManagerPort: Int)
+      configuration: Configuration)
     : (String, Int) = {
 
     var taskManagerHostname = configuration.getString(
@@ -1308,25 +1344,12 @@ object TaskManager {
       LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
     }
     else {
-      // try to find out the hostname of the interface from which the TaskManager
-      // can connect to the JobManager. This involves a reverse name lookup
-      LOG.info("Trying to select the network interface and address to use " +
-        "by connecting to the configured JobManager.")
-
-      LOG.info(s"TaskManager will try to connect for $MAX_STARTUP_CONNECT_TIME milliseconds " +
-        s"before falling back to heuristics")
-
-      val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
-      val taskManagerAddress = try {
-        // try to get the address for up to two minutes and start
-        // logging only after ten seconds
-        NetUtils.findConnectingAddress(jobManagerAddress,
-          MAX_STARTUP_CONNECT_TIME, STARTUP_CONNECT_LOG_SUPPRESS)
-      }
-      catch {
-        case t: Throwable => throw new IOException("TaskManager cannot find a network interface " +
-          "that can communicate with the JobManager (" + jobManagerAddress + ")", t)
-      }
+      val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
+      val lookupTimeout = AkkaUtils.getLookupTimeout(configuration)
+
+      val taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
+        leaderRetrievalService,
+        lookupTimeout)
 
       taskManagerHostname = taskManagerAddress.getHostName()
       LOG.info(s"TaskManager will use hostname/address '${taskManagerHostname}' " +
@@ -1439,7 +1462,8 @@ object TaskManager {
         taskManagerSystem,
         taskManagerHostname,
         Some(TASK_MANAGER_NAME),
-        None, false,
+        None,
+        false,
         streamingMode,
         taskManagerClass)
 
@@ -1489,9 +1513,9 @@ object TaskManager {
    * @param taskManagerHostname The hostname/address that describes the TaskManager's data location.
    * @param taskManagerActorName Optionally the name of the TaskManager actor. If none is given,
    *                             the actor will use a random name.
-   * @param jobManagerPath Optionally, the JobManager actor path may be provided. If none is
-   *                       provided, the method will construct it automatically from the
-   *                       JobManager hostname an port specified in the configuration.
+   * @param leaderRetrievalServiceOption Optionally, a leader retrieval service can be provided. If
+   *                                     none is given, then a LeaderRetrievalService is
+   *                                     constructed from the configuration.
    * @param localTaskManagerCommunication If true, the TaskManager will not initiate the
    *                                      TCP network stack.
    * @param streamingMode The streaming mode to start the TaskManager in (streaming/batch-only)
@@ -1516,20 +1540,13 @@ object TaskManager {
       actorSystem: ActorSystem,
       taskManagerHostname: String,
       taskManagerActorName: Option[String],
-      jobManagerPath: Option[String],
+      leaderRetrievalServiceOption: Option[LeaderRetrievalService],
       localTaskManagerCommunication: Boolean,
       streamingMode: StreamingMode,
       taskManagerClass: Class[_ <: TaskManager])
     : ActorRef = {
 
-    // get and check the JobManager config
-    val jobManagerAkkaUrl: String = jobManagerPath.getOrElse {
-      val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
-      val hostPort = new InetSocketAddress(jobManagerHostname, jobManagerPort)
-      JobManager.getRemoteJobManagerAkkaURL(hostPort)
-    }
-
-    val (taskManagerConfig : TaskManagerConfiguration,
+    val (taskManagerConfig : TaskManagerConfiguration,      
       netConfig: NetworkEnvironmentConfiguration,
       connectionInfo: InstanceConnectionInfo
     ) = parseTaskManagerConfiguration(
@@ -1596,9 +1613,9 @@ object TaskManager {
     // start the I/O manager last, it will create some temp directories.
     val ioManager: IOManager = new IOManagerAsync(taskManagerConfig.tmpDirPaths)
 
-    if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
-      // TODO @removeme @tillrohrmann Setup leader retrieval service
-      LOG.info("HA mode.")
+    val leaderRetrievalService = leaderRetrievalServiceOption match {
+      case Some(lrs) => lrs
+      case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
     }
 
     // create the actor properties (which define the actor constructor parameters)
@@ -1606,11 +1623,11 @@ object TaskManager {
       taskManagerClass,
       taskManagerConfig,
       connectionInfo,
-      jobManagerAkkaUrl,
       memoryManager,
       ioManager,
       network,
-      taskManagerConfig.numberOfSlots)
+      taskManagerConfig.numberOfSlots,
+      leaderRetrievalService)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1639,7 +1656,7 @@ object TaskManager {
       timeout: FiniteDuration)
     : ActorRef = {
     try {
-      val future = AkkaUtils.getReference(taskManagerUrl, system, timeout)
+      val future = AkkaUtils.getActorRefFuture(taskManagerUrl, system, timeout)
       Await.result(future, timeout)
     }
     catch {
@@ -1756,8 +1773,13 @@ object TaskManager {
     val nettyConfig = if (localTaskManagerCommunication) {
       None
     } else {
-      Some(new NettyConfig(
-        connectionInfo.address(), connectionInfo.dataPort(), pageSize, configuration))
+      Some(
+        new NettyConfig(
+          connectionInfo.address(),
+          connectionInfo.dataPort(),
+          pageSize,
+          configuration)
+      )
     }
 
     // Default spill I/O mode for intermediate results
@@ -1822,7 +1844,7 @@ object TaskManager {
    * @param configuration The configuration to read the config values from.
    * @return A 2-tuple (hostname, port).
    */
-  private def getAndCheckJobManagerAddress(configuration: Configuration) : (String, Int) = {
+  def getAndCheckJobManagerAddress(configuration: Configuration) : (String, Int) = {
 
     val hostname = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 324b014..3ffc68f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.Option;
 
 import java.util.UUID;
 
@@ -51,10 +50,14 @@ public class FlinkUntypedActorTest {
 		JavaTestKit.shutdownActorSystem(actorSystem);
 	}
 
+	/**
+	 * Tests that LeaderSessionMessage messages with a wrong leader session ID are filtered
+	 * out.
+	 */
 	@Test
 	public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
-		final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
-		final Option<UUID> oldSessionID = Option.apply(UUID.randomUUID());
+		final UUID leaderSessionID = UUID.randomUUID();
+		final UUID oldSessionID = UUID.randomUUID();
 
 		TestActorRef<PlainFlinkUntypedActor> actor = null;
 
@@ -76,9 +79,13 @@ public class FlinkUntypedActorTest {
 		}
 	}
 
+	/**
+	 * Tests that an exception is thrown, when the FlinkUntypedActore receives a message which
+	 * extends {@link RequiresLeaderSessionID} and is not wrapped in a LeaderSessionMessage.
+	 */
 	@Test
 	public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() {
-		final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID());
+		final UUID leaderSessionID = UUID.randomUUID();
 
 		TestActorRef<PlainFlinkUntypedActor> actor = null;
 
@@ -95,7 +102,8 @@ public class FlinkUntypedActorTest {
 						"message was sent without being wrapped in LeaderSessionMessage.");
 			} catch (Exception e) {
 				assertEquals("Received a message PlainRequiresLeaderSessionID " +
-						"without a leader session ID, even though it requires to have one.",
+						"without a leader session ID, even though the message requires a " +
+						"leader session ID.",
 						e.getMessage());
 			}
 
@@ -110,14 +118,14 @@ public class FlinkUntypedActorTest {
 		}
 	}
 
-
 	static class PlainFlinkUntypedActor extends FlinkUntypedActor {
 
-		private Option<UUID> leaderSessionID;
+		private UUID leaderSessionID;
 
 		private int messageCounter;
 
-		public PlainFlinkUntypedActor(Option<UUID> leaderSessionID) {
+		public PlainFlinkUntypedActor(UUID leaderSessionID) {
+
 			this.leaderSessionID = leaderSessionID;
 			this.messageCounter = 0;
 		}
@@ -128,7 +136,7 @@ public class FlinkUntypedActorTest {
 		}
 
 		@Override
-		protected Option<UUID> getLeaderSessionID() {
+		protected UUID getLeaderSessionID() {
 			return leaderSessionID;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index b124304..f6e4ab8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.Tasks;
 
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
 import scala.concurrent.Await;
@@ -49,8 +51,9 @@ public class CoordinatorShutdownTest {
 		LocalFlinkMiniCluster cluster = null;
 		try {
 			Configuration noTaskManagerConfig = new Configuration();
-			noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 0);
+			noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 0);
 			cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true);
+			cluster.start();
 			
 			// build a test graph with snapshotting enabled
 			JobVertex vertex = new JobVertex("Test Vertex");
@@ -60,17 +63,19 @@ public class CoordinatorShutdownTest {
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
 			
-			ActorGateway jobManager = cluster.getJobManagerGateway();
+			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
 			FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
-			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
+			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(
+					testGraph,
+					ListeningBehaviour.EXECUTION_RESULT);
 			
 			// submit is successful, but then the job dies because no TaskManager / slot is available
-			Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
+			Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
 			Await.result(submitFuture, timeout);
 
 			// get the execution graph and make sure the coordinator is properly shut down
-			Future<Object> jobRequestFuture = jobManager.ask(
+			Future<Object> jobRequestFuture = jmGateway.ask(
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
 			
@@ -99,6 +104,7 @@ public class CoordinatorShutdownTest {
 		LocalFlinkMiniCluster cluster = null;
 		try {
 			cluster = new LocalFlinkMiniCluster(new Configuration(), true);
+			cluster.start();
 			
 			// build a test graph with snapshotting enabled
 			JobVertex vertex = new JobVertex("Test Vertex");
@@ -108,17 +114,19 @@ public class CoordinatorShutdownTest {
 			JobGraph testGraph = new JobGraph("test job", vertex);
 			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
 			
-			ActorGateway jobManager = cluster.getJobManagerGateway();
+			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
 			FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
-			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
+			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(
+					testGraph,
+					ListeningBehaviour.EXECUTION_RESULT);
 
 			// submit is successful, but then the job dies because no TaskManager / slot is available
-			Future<Object> submitFuture = jobManager.ask(submitMessage, timeout);
+			Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout);
 			Await.result(submitFuture, timeout);
 
 			// get the execution graph and make sure the coordinator is properly shut down
-			Future<Object> jobRequestFuture = jobManager.ask(
+			Future<Object> jobRequestFuture = jmGateway.ask(
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index e9b67af..2daa62e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -35,7 +35,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
@@ -574,9 +575,9 @@ public class ExecutionVertexCancelTest {
 		@Override
 		public Object handleMessage(Object message) throws Exception {
 			Object result;
-			if(message instanceof TaskMessages.SubmitTask) {
+			if(message instanceof SubmitTask) {
 				result = Messages.getAcknowledge();
-			} else if(message instanceof TaskMessages.CancelTask) {
+			} else if(message instanceof CancelTask) {
 				index++;
 
 				if(index >= results.length){

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
index 2e62781..62bc96b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.instance;
 
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
-import scala.Option;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -114,7 +113,7 @@ abstract public class BaseTestingActorGateway implements ActorGateway {
 	}
 
 	@Override
-	public Option<UUID> leaderSessionID() {
-		return Option.empty();
+	public UUID leaderSessionID() {
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
index 10762f2..3d27611 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.instance;
 
 import akka.actor.ActorRef;
-import scala.Option;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -62,7 +61,7 @@ public class DummyActorGateway implements ActorGateway {
 	}
 
 	@Override
-	public Option<UUID> leaderSessionID() {
-		return Option.<UUID>empty();
+	public UUID leaderSessionID() {
+		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 2c01f1f..a0166eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -42,8 +42,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import scala.Option;
-import scala.Some;
 
 /**
  * Tests for {@link org.apache.flink.runtime.instance.InstanceManager}.
@@ -52,7 +50,7 @@ public class InstanceManagerTest{
 
 	static ActorSystem system;
 
-	static Option<UUID> leaderSessionID = new Some<UUID>(UUID.randomUUID());
+	static UUID leaderSessionID = UUID.randomUUID();
 
 	@BeforeClass
 	public static void setup(){

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 6978225..1515f83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
 import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
-import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index e9f3a62..fdbffaa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -51,13 +51,15 @@ public class PartialConsumePipelinedResultTest {
 	@BeforeClass
 	public static void setUp() throws Exception {
 		final Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
 		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, NUMBER_OF_NETWORK_BUFFERS);
 
 		flink = new TestingCluster(config, true);
 
+		flink.start();
+
 		jobClient = JobClient.startJobClientActorSystem(flink.configuration());
 	}
 
@@ -102,7 +104,7 @@ public class PartialConsumePipelinedResultTest {
 
 		JobClient.submitJobAndWait(
 				jobClient,
-				flink.getJobManagerGateway(),
+				flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
 				jobGraph,
 				TestingUtils.TESTING_DURATION(),
 				false, this.getClass().getClassLoader());

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 938c2ad..cd56318 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -24,13 +24,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
index be73bf5..b2aaab0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -101,7 +101,7 @@ public class JobManagerProcessReapingTest {
 			Throwable lastError = null;
 			for (int i = 0; i < 40; i++) {
 				try {
-					jobManagerRef = JobManager.getJobManagerRemoteReference(
+					jobManagerRef = JobManager.getJobManagerActorRef(
 							new InetSocketAddress("localhost", jobManagerPort),
 							localSystem, new FiniteDuration(25, TimeUnit.SECONDS));
 					break;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 217f46e..db6df75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -26,6 +26,7 @@ import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -49,16 +50,15 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.Execution
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
 
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import scala.Option;
 import scala.Some;
 import scala.Tuple2;
 
 import java.net.InetAddress;
-import java.util.UUID;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
@@ -121,13 +121,18 @@ public class JobManagerTest {
 				final JobGraph jobGraph = new JobGraph("Blocking test job", sender);
 				final JobID jid = jobGraph.getJobID();
 
-				final ActorGateway jobManagerGateway = cluster.getJobManagerGateway();
+				final ActorGateway jobManagerGateway = cluster.getLeaderGateway(
+						TestingUtils.TESTING_DURATION());
 
 				// we can set the leader session ID to None because we don't use this gateway to send messages
-				final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), Option.<UUID>empty());
+				final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null);
 
 				// Submit the job and wait for all vertices to be running
-				jobManagerGateway.tell(new SubmitJob(jobGraph, false), testActorGateway);
+				jobManagerGateway.tell(
+						new SubmitJob(
+								jobGraph,
+								ListeningBehaviour.EXECUTION_RESULT),
+						testActorGateway);
 				expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
 				jobManagerGateway.tell(

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 560fc30..bba1460 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -20,16 +20,21 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -54,13 +59,18 @@ public class JobSubmitTest {
 	private static final FiniteDuration timeout = new FiniteDuration(5000, TimeUnit.MILLISECONDS);
 
 	private static ActorSystem jobManagerSystem;
-	private static ActorGateway jobManager;
+	private static ActorGateway jmGateway;
 
 	@BeforeClass
 	public static void setupJobManager() {
 		Configuration config = new Configuration();
 
-		scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.empty();
+		int port = NetUtils.getAvailablePort();
+
+		config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+		config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
+
+		scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.apply(new Tuple2<String, Object>("localhost", port));
 		jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress);
 		ActorRef jobManagerActorRef = JobManager.startJobManagerActors(
 				config,
@@ -68,12 +78,16 @@ public class JobSubmitTest {
 				StreamingMode.BATCH_ONLY)._1();
 
 		try {
-			jobManager = JobManager.getJobManagerGateway(jobManagerActorRef, timeout);
+			LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+
+			jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway(
+					lrs,
+					jobManagerSystem,
+					timeout
+			);
 		} catch (Exception e) {
 			fail("Could not retrieve the JobManager gateway. " + e.getMessage());
 		}
-
-
 	}
 
 	@AfterClass
@@ -92,7 +106,7 @@ public class JobSubmitTest {
 			JobGraph jg = new JobGraph("test job", jobVertex);
 
 			// request the blob port from the job manager
-			Future<Object> future = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
+			Future<Object> future = jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
 			int blobPort = (Integer) Await.result(future, timeout);
 
 			// upload two dummy bytes and add their keys to the job graph as dependencies
@@ -113,7 +127,11 @@ public class JobSubmitTest {
 			jg.addBlob(key2);
 
 			// submit the job
-			Future<Object> submitFuture = jobManager.ask(new JobManagerMessages.SubmitJob(jg, false), timeout);
+			Future<Object> submitFuture = jmGateway.ask(
+					new JobManagerMessages.SubmitJob(
+							jg,
+							ListeningBehaviour.EXECUTION_RESULT),
+					timeout);
 			try {
 				Await.result(submitFuture, timeout);
 			}
@@ -152,7 +170,11 @@ public class JobSubmitTest {
 			JobGraph jg = new JobGraph("test job", jobVertex);
 
 			// submit the job
-			Future<Object> submitFuture = jobManager.ask(new JobManagerMessages.SubmitJob(jg, false), timeout);
+			Future<Object> submitFuture = jmGateway.ask(
+					new JobManagerMessages.SubmitJob(
+							jg,
+							ListeningBehaviour.EXECUTION_RESULT),
+					timeout);
 			try {
 				Await.result(submitFuture, timeout);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
new file mode 100644
index 0000000..e6067d0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.apache.curator.test.TestingCluster;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.LeaderElectionUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+public class JobManagerLeaderElectionTest extends TestLogger {
+
+	private static ActorSystem actorSystem;
+	private static TestingCluster testingCluster;
+	private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
+	private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	@BeforeClass
+	public static void setup() throws Exception {
+		actorSystem = ActorSystem.create("TestingActorSystem");
+
+		testingCluster = new TestingCluster(3);
+		testingCluster.start();
+	}
+
+	@AfterClass
+	public static void teardown() throws Exception {
+		if (actorSystem != null) {
+			JavaTestKit.shutdownActorSystem(actorSystem);
+		}
+
+		if(testingCluster != null) {
+			testingCluster.stop();
+		}
+	}
+
+	/**
+	 * Tests that a single JobManager is elected as the leader by ZooKeeper.
+	 */
+	@Test
+	public void testLeaderElection() throws Exception {
+		final Configuration configuration = new Configuration();
+
+		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(
+				ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+				testingCluster.getConnectString());
+
+		ActorRef jm = null;
+
+		try {
+			Props jmProps = createJobManagerProps(configuration);
+
+			jm = actorSystem.actorOf(jmProps);
+
+			Future<Object> leaderFuture = Patterns.ask(
+					jm,
+					TestingJobManagerMessages.getNotifyWhenLeader(),
+					timeout);
+
+			Await.ready(leaderFuture, duration);
+		} finally {
+			TestingUtils.stopActor(jm);
+		}
+
+	}
+
+	/**
+	 * Tests that a second JobManager is elected as the leader once the previous leader dies.
+	 */
+	@Test
+	public void testLeaderReelection() throws Exception {
+		final Configuration configuration = new Configuration();
+
+		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
+		configuration.setString(
+				ConfigConstants.ZOOKEEPER_QUORUM_KEY,
+				testingCluster.getConnectString());
+
+		ActorRef jm;
+		ActorRef jm2 = null;
+
+		try {
+			Props jmProps = createJobManagerProps(configuration);
+
+			jm = actorSystem.actorOf(jmProps);
+
+			Future<Object> leaderFuture = Patterns.ask(
+					jm,
+					TestingJobManagerMessages.getNotifyWhenLeader(),
+					timeout);
+
+			Await.ready(leaderFuture, duration);
+
+			Props jmProps2 = createJobManagerProps(configuration);
+
+			jm2 = actorSystem.actorOf(jmProps2);
+
+			jm.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+			// now the second JobManager should be elected as the leader
+			Future<Object> leader2Future = Patterns.ask(
+					jm2,
+					TestingJobManagerMessages.getNotifyWhenLeader(),
+					timeout
+			);
+
+			Await.ready(leader2Future, duration);
+		} finally {
+			TestingUtils.stopActor(jm2);
+		}
+	}
+
+	private Props createJobManagerProps(Configuration configuration) throws Exception {
+		LeaderElectionService leaderElectionService = LeaderElectionUtils.
+				createLeaderElectionService(configuration);
+
+		return Props.create(
+				TestingJobManager.class,
+				configuration,
+				TestingUtils.defaultExecutionContext(),
+				new InstanceManager(),
+				new Scheduler(TestingUtils.defaultExecutionContext()),
+				new BlobLibraryCacheManager(new BlobServer(configuration), 10l),
+				ActorRef.noSender(),
+				1,
+				1L,
+				AkkaUtils.getDefaultTimeout(),
+				StreamingMode.BATCH_ONLY,
+				leaderElectionService
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9de4ed3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
new file mode 100644
index 0000000..8dd380e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+public class LeaderChangeStateCleanupTest extends TestLogger {
+
+	private static FiniteDuration timeout = TestingUtils.TESTING_DURATION();
+
+	private int numJMs = 2;
+	private int numTMs = 2;
+	private int numSlotsPerTM = 2;
+	private int parallelism = numTMs * numSlotsPerTM;
+
+	private Configuration configuration;
+	private LeaderElectionRetrievalTestingCluster cluster = null;
+	private JobGraph job = createBlockingJob(parallelism);
+
+	@Before
+	public void before() throws Exception {
+		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
+
+		configuration = new Configuration();
+
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
+
+		cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, StreamingMode.BATCH_ONLY);
+		cluster.start(false); // TaskManagers don't have to register at the JobManager
+
+		cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive
+	}
+
+	@After
+	public void after() {
+		if(cluster != null) {
+			cluster.stop();
+		}
+	}
+
+	/**
+	 * Tests that a job is properly canceled in the case of a leader change. In such an event all
+	 * TaskManagers have to disconnect from the previous leader and connect to the newly elected
+	 * leader.
+	 */
+	@Test
+	public void testStateCleanupAfterNewLeaderElectionAndListenerNotification() throws Exception {
+		UUID leaderSessionID1 = UUID.randomUUID();
+		UUID leaderSessionID2 = UUID.randomUUID();
+
+		// first make JM(0) the leader
+		cluster.grantLeadership(0, leaderSessionID1);
+		// notify all listeners
+		cluster.notifyRetrievalListeners(0, leaderSessionID1);
+
+		cluster.waitForTaskManagersToBeRegistered();
+
+		// submit blocking job so that it is not finished when we cancel it
+		cluster.submitJobDetached(job);
+
+		ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+		Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+		Await.ready(wait, timeout);
+
+		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+		// make the JM(1) the new leader
+		cluster.grantLeadership(1, leaderSessionID2);
+		// notify all listeners about the event
+		cluster.notifyRetrievalListeners(1, leaderSessionID2);
+
+		Await.ready(jobRemoval, timeout);
+
+		cluster.waitForTaskManagersToBeRegistered();
+
+		ActorGateway jm2 = cluster.getLeaderGateway(timeout);
+
+		Future<Object> futureNumberSlots = jm2.ask(JobManagerMessages.getRequestTotalNumberOfSlots(), timeout);
+
+		// check that all TMs have registered at the new leader
+		int numberSlots = (Integer)Await.result(futureNumberSlots, timeout);
+
+		assertEquals(parallelism, numberSlots);
+
+		// try to resubmit now the non-blocking job, it should complete successfully
+		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
+		cluster.submitJobAndWait(job, false, timeout);
+	}
+
+	/**
+	 * Tests that a job is properly canceled in the case of a leader change. However, this time only the
+	 * JMs are notified about the leader change and the TMs still believe the old leader to have
+	 * leadership.
+	 */
+	@Test
+	public void testStateCleanupAfterNewLeaderElection() throws Exception {
+		UUID leaderSessionID = UUID.randomUUID();
+		UUID newLeaderSessionID = UUID.randomUUID();
+
+		cluster.grantLeadership(0, leaderSessionID);
+		cluster.notifyRetrievalListeners(0, leaderSessionID);
+
+		cluster.waitForTaskManagersToBeRegistered();
+
+		// submit blocking job so that we can test job clean up
+		cluster.submitJobDetached(job);
+
+		ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+		Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+		Await.ready(wait, timeout);
+
+		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+		// only notify the JMs about the new leader JM(1)
+		cluster.grantLeadership(1, newLeaderSessionID);
+
+		// job should be removed anyway
+		Await.ready(jobRemoval, timeout);
+	}
+
+	/**
+	 * Tests that a job is properly canceled in the event of a leader change. However, this time
+	 * only the TMs are notified about the changing leader. This should be enough to cancel the
+	 * currently running job, though.
+	 */
+	@Test
+	public void testStateCleanupAfterListenerNotification() throws Exception {
+		UUID leaderSessionID = UUID.randomUUID();
+		UUID newLeaderSessionID = UUID.randomUUID();
+
+		cluster.grantLeadership(0, leaderSessionID);
+		cluster.notifyRetrievalListeners(0, leaderSessionID);
+
+		cluster.waitForTaskManagersToBeRegistered();
+
+		// submit blocking job
+		cluster.submitJobDetached(job);
+
+		ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+		Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+		Await.ready(wait, timeout);
+
+		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+		// notify listeners (TMs) about the leader change
+		cluster.notifyRetrievalListeners(1, newLeaderSessionID);
+
+		Await.ready(jobRemoval, timeout);
+	}
+
+	/**
+	 * Tests that the same JobManager can be reelected as the leader. Even though, the same JM
+	 * is elected as the next leader, all currently running jobs should be canceled properly and
+	 * all TMs should disconnect from the leader and then reconnect to it.
+	 */
+	@Test
+	public void testReelectionOfSameJobManager() throws Exception {
+		UUID leaderSessionID = UUID.randomUUID();
+		UUID newLeaderSessionID = UUID.randomUUID();
+
+		FiniteDuration shortTimeout = new FiniteDuration(20, TimeUnit.SECONDS);
+
+		cluster.grantLeadership(0, leaderSessionID);
+		cluster.notifyRetrievalListeners(0, leaderSessionID);
+
+		cluster.waitForTaskManagersToBeRegistered();
+
+		// submit blocking job
+		cluster.submitJobDetached(job);
+
+		ActorGateway jm = cluster.getLeaderGateway(timeout);
+
+		Future<Object> wait = jm.ask(new WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout);
+
+		Await.ready(wait, timeout);
+
+		Future<Object> jobRemoval = jm.ask(new NotifyWhenJobRemoved(job.getJobID()), timeout);
+
+		// make JM(0) again the leader --> this implies first a leadership revokal
+		cluster.grantLeadership(0, newLeaderSessionID);
+
+		Await.ready(jobRemoval, timeout);
+
+		// The TMs should not be able to reconnect since they don't know the current leader
+		// session ID
+		try {
+			cluster.waitForTaskManagersToBeRegistered(shortTimeout);
+			fail("TaskManager should not be able to register at JobManager.");
+		} catch (TimeoutException e) {
+			// expected exception since the TMs have still the old leader session ID
+		}
+
+		// notify the TMs about the new (old) leader
+		cluster.notifyRetrievalListeners(0, newLeaderSessionID);
+
+		cluster.waitForTaskManagersToBeRegistered();
+
+		// try to resubmit now the non-blocking job, it should complete successfully
+		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
+		cluster.submitJobAndWait(job, false, timeout);
+
+	}
+
+	public JobGraph createBlockingJob(int parallelism) {
+		Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
+
+		JobVertex sender = new JobVertex("sender");
+		JobVertex receiver = new JobVertex("receiver");
+
+		sender.setInvokableClass(Tasks.Sender.class);
+		receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
+
+		sender.setParallelism(parallelism);
+		receiver.setParallelism(parallelism);
+
+		receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
+
+		SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+		sender.setSlotSharingGroup(slotSharingGroup);
+		receiver.setSlotSharingGroup(slotSharingGroup);
+
+		return new JobGraph("Blocking test job", sender, receiver);
+	}
+}