-import java.nio.ByteBuffer
-import java.util.Collections
-import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.runtime.FlinkActor
-import org.apache.flink.runtime.jobgraph.JobStatus
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus}
-import org.apache.flink.runtime.messages.Messages.Acknowledge
-import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
-import org.apache.flink.yarn.Messages._
-import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient}
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.exceptions.YarnException
-import org.apache.hadoop.yarn.util.Records
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Try
-trait ApplicationMasterActor extends FlinkActor {
-  that: JobManager =>
-  import context._
-  import scala.collection.JavaConverters._
-  val FAST_YARN_HEARTBEAT_DELAY: FiniteDuration = 500 milliseconds
-  val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
-  val YARN_HEARTBEAT_DELAY: FiniteDuration =
-    if(flinkConfiguration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, null) == null) {
-    } else {
-      FiniteDuration(
-        flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5), SECONDS)
-    }
-  private def env = System.getenv()
-  // indicates if this AM has been started in a detached mode.
-  val detached = java.lang.Boolean.valueOf(env.get(FlinkYarnClient.ENV_DETACHED))
-  var stopWhenJobFinished: JobID = null
-  var rmClientOption: Option[AMRMClient[ContainerRequest]] = None
-  var nmClientOption: Option[NMClient] = None
-  var messageListener:Option[ActorRef] = None
-  var containerLaunchContext: Option[ContainerLaunchContext] = None
-  var runningContainers = 0 // number of currently running containers
-  var failedContainers = 0 // failed container count
-  var numTaskManager = 0 // the requested number of TMs
-  var maxFailedContainers = 0
-  var containersLaunched = 0
-  var numPendingRequests = 0 // number of currently pending container allocation requests.
-  var memoryPerTaskManager = 0
-  // list of containers available for starting
-  var allocatedContainersList: mutable.MutableList[Container] = new mutable.MutableList[Container]
-  var runningContainersList: mutable.MutableList[Container] = new mutable.MutableList[Container]
-  abstract override def handleMessage: Receive = {
-    handleYarnMessage orElse super.handleMessage
-  }
-  def handleYarnMessage: Receive = {
-    case StopYarnSession(status, diag) =>
-"Stopping YARN JobManager with status $status and diagnostic $diag.")
-      instanceManager.getAllRegisteredInstances.asScala foreach {
-        instance =>
-          instance.getActorGateway.tell(StopYarnSession(status, diag))
-      }
-      rmClientOption foreach {
-        rmClient =>
-          Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{
-            case t: Throwable => log.error("Could not unregister the application master.", t)
-          }
-          Try(rmClient.close()).recover{
-            case t:Throwable => log.error("Could not close the AMRMClient.", t)
-          }
-      }
-      rmClientOption = None
-      nmClientOption foreach {
-        nmClient =>
-        Try(nmClient.close()).recover{
-          case t: Throwable => log.error("Could not close the NMClient.", t)
-        }
-      }
-      nmClientOption = None
-      messageListener foreach {
-          _ ! decorateMessage(JobManagerStopped)
-      }
-      context.system.shutdown()
-    case RegisterClient(client) =>
-"Register ${client.path} as client.")
-      messageListener = Some(client)
-      sender ! decorateMessage(Acknowledge)
-    case UnregisterClient =>
-      messageListener = None
-    case msg: StopAMAfterJob =>
-      val jobId = msg.jobId
-"ApplicatonMaster will shut down YARN session when job $jobId has finished.")
-      stopWhenJobFinished = jobId
-      sender() ! decorateMessage(Acknowledge)
-    case PollYarnClusterStatus =>
-      sender() ! decorateMessage(
-        new FlinkYarnClusterStatus(
-          instanceManager.getNumberOfRegisteredTaskManagers,
-          instanceManager.getTotalNumberOfSlots)
-      )
-    case StartYarnSession(conf, actorSystemPort, webServerPort) =>
-      startYarnSession(conf, actorSystemPort, webServerPort)
-    case jnf: JobNotFound =>
-      log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
-      if(stopWhenJobFinished == null) {
-        log.warn("The ApplicationMaster didn't expect to receive this message")
-      }
-    case jobStatus: CurrentJobStatus =>
-      if(stopWhenJobFinished == null) {
-        log.warn(s"Received job status $jobStatus which wasn't requested.")
-      } else {
-        if(stopWhenJobFinished != jobStatus.jobID) {
-          log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
-            s"job $stopWhenJobFinished")
-        } else {
-          if(jobStatus.status.isTerminalState) {
-  "Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
-              s"Shutting down YARN session")
-            if (jobStatus.status == JobStatus.FINISHED) {
-              self ! decorateMessage(
-                StopYarnSession(
-                  FinalApplicationStatus.SUCCEEDED,
-                  s"The monitored job with ID ${jobStatus.jobID} has finished.")
-              )
-            } else {
-              self ! decorateMessage(
-                StopYarnSession(
-                  FinalApplicationStatus.FAILED,
-                  s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
-              )
-            }
-          } else {
-            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
-          }
-        }
-      }
-    case HeartbeatWithYarn =>
-      // piggyback on the YARN heartbeat to check if the job has finished
-      if(stopWhenJobFinished != null) {
-        self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
-      }
-      rmClientOption match {
-        case Some(rmClient) =>
-          log.debug("Send heartbeat to YARN")
-          val response = rmClient.allocate(runningContainers.toFloat / numTaskManager)
-          // ---------------------------- handle YARN responses -------------
-          // get new containers from YARN
-          for (container <- response.getAllocatedContainers.asScala) {
-  "Got new container for allocation: ${container.getId}")
-            allocatedContainersList += container
-            if(numPendingRequests > 0) {
-              numPendingRequests -= 1
-            }
-          }
-          // get failed containers (returned containers are also completed, so we have to
-          // distinguish if it was running before).
-          for (status <- response.getCompletedContainersStatuses.asScala) {
-  "Container ${status.getContainerId} is completed " +
-              s"with diagnostics: ${status.getDiagnostics}")
-            // remove failed container from running containers
-            runningContainersList = runningContainersList.filter(runningContainer => {
-              val wasRunningContainer = runningContainer.getId.equals(status.getContainerId)
-              if(wasRunningContainer) {
-                failedContainers += 1
-                runningContainers -= 1
-      "Container ${status.getContainerId} was a running container. " +
-                  s"Total failed containers $failedContainers.")
-                val detail = status.getExitStatus match {
-                  case -103 => "Vmem limit exceeded";
-                  case -104 => "Pmem limit exceeded";
-                  case _ => ""
-                }
-                messageListener foreach {
-                  _ ! decorateMessage(
-                    YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " +
-                    s"state=${status.getState}.\n${status.getDiagnostics} $detail")
-                  )
-                }
-              }
-              // return
-              !wasRunningContainer
-            })
-          }
-          // return containers if the RM wants them and we haven't allocated them yet.
-          val preemptionMessage = response.getPreemptionMessage
-          if(preemptionMessage != null) {
-  "Received preemtion message from YARN $preemptionMessage.")
-            val contract = preemptionMessage.getContract
-            if(contract != null) {
-              tryToReturnContainers(contract.getContainers.asScala)
-            }
-            val strictContract = preemptionMessage.getStrictContract
-            if(strictContract != null) {
-              tryToReturnContainers(strictContract.getContainers.asScala)
-            }
-          }
-          // ---------------------------- decide if we need to do anything ---------
-          // check if we want to start some of our allocated containers.
-          if(runningContainers < numTaskManager) {
-            var missingContainers = numTaskManager - runningContainers
-  "The user requested $numTaskManager containers, $runningContainers " +
-              s"running. $missingContainers containers missing")
-            // not enough containers running
-            if(allocatedContainersList.size > 0) {
-    "${allocatedContainersList.size} containers already allocated by YARN. " +
-                "Starting...")
-              // we have some containers allocated to us --> start them
-              allocatedContainersList = allocatedContainersList.dropWhile(container => {
-                if (missingContainers <= 0) {
-                  require(missingContainers == 0, "The variable can not be negative. Illegal state")
-                  false
-                } else {
-                  // start the container
-                  nmClientOption match {
-                    case Some(nmClient) =>
-                      containerLaunchContext match {
-                        case Some(ctx) => {
-                          try {
-                            nmClient.startContainer(container, ctx)
-                            runningContainers += 1
-                            missingContainers -= 1
-                            val message = s"Launching container $containersLaunched " +
-                              s"(${container.getId} on host ${container.getNodeId.getHost})."
-                            containersLaunched += 1
-                            runningContainersList += container
-                            messageListener foreach {
-                              _ ! decorateMessage(YarnMessage(message))
-                            }
-                          } catch {
-                            case e: YarnException =>
-                              log.error("Exception while starting YARN container", e)
-                          }
-                        }
-                        case None =>
-                          log.error("The ContainerLaunchContext was not set.")
-                          self ! decorateMessage(
-                            StopYarnSession(
-                              FinalApplicationStatus.FAILED,
-                              "Fatal error in AM: The ContainerLaunchContext was not set.")
-                          )
-                      }
-                    case None =>
-                      log.error("The NMClient was not set.")
-                      self ! decorateMessage(
-                        StopYarnSession(
-                          FinalApplicationStatus.FAILED,
-                          "Fatal error in AM: The NMClient was not set.")
-                      )
-                  }
-                  // dropping condition
-                  true
-                }
-              })
-            }
-            // if there are still containers missing, request them from YARN
-            val toAllocateFromYarn = Math.max(missingContainers - numPendingRequests, 0)
-            if(toAllocateFromYarn > 0) {
-              val reallocate = flinkConfiguration
-                .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true)
-    "There are $missingContainers containers missing." +
-                s" $numPendingRequests are already requested. " +
-                s"Requesting $toAllocateFromYarn additional container(s) from YARN. " +
-                s"Reallocation of failed containers is enabled=$reallocate " +
-                s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
-              // there are still containers missing. Request them from YARN
-              if(reallocate) {
-                for(i <- 1 to toAllocateFromYarn) {
-                  val containerRequest = getContainerRequest(memoryPerTaskManager)
-                  rmClient.addContainerRequest(containerRequest)
-                  numPendingRequests += 1
-        "Requested additional container from YARN. Pending requests " +
-                    s"$numPendingRequests.")
-                }
-              }
-            }
-          }
-          if(runningContainers >= numTaskManager && allocatedContainersList.size > 0) {
-  "Flink has ${allocatedContainersList.size} allocated containers which " +
-              s"are not needed right now. Returning them")
-            for(container <- allocatedContainersList) {
-              rmClient.releaseAssignedContainer(container.getId)
-            }
-            allocatedContainersList.clear()
-          }
-          // maxFailedContainers == -1 is infinite number of retries.
-          if(maxFailedContainers != -1 && failedContainers >= maxFailedContainers) {
-            val msg = s"Stopping YARN session because the number of failed " +
-              s"containers ($failedContainers) exceeded the maximum failed container " +
-              s"count ($maxFailedContainers). This number is controlled by " +
-              s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " +
-              s"setting. By default its the number of requested containers"
-            log.error(msg)
-            self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
-          }
-          // schedule next heartbeat:
-          if (runningContainers < numTaskManager) {
-            // we don't have the requested number of containers. Do fast polling
-            context.system.scheduler.scheduleOnce(
-              self,
-              decorateMessage(HeartbeatWithYarn))
-          } else {
-            // everything is good, slow down polling
-            context.system.scheduler.scheduleOnce(
-              YARN_HEARTBEAT_DELAY,
-              self,
-              decorateMessage(HeartbeatWithYarn))
-          }
-        case None =>
-          log.error("The AMRMClient was not set.")
-          self ! decorateMessage(
-            StopYarnSession(
-              FinalApplicationStatus.FAILED,
-              "Fatal error in AM: AMRMClient was not set")
-          )
-      }
-      log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " +
-        s"failed containers $failedContainers, " +
-        s"allocated containers ${allocatedContainersList.size}.")
-  }
-  private def runningContainerIds(): mutable.MutableList[ContainerId] = {
-    runningContainersList map { runningCont => runningCont.getId}
-  }
-  private def allocatedContainerIds(): mutable.MutableList[ContainerId] = {
-    allocatedContainersList map { runningCont => runningCont.getId}
-  }
-  private def startYarnSession(
-      conf: Configuration,
-      actorSystemPort: Int,
-      webServerPort: Int)
-    : Unit = {
-    Try {
-"Start yarn session.")
-      memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
-      val memoryLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)
-      val applicationMasterHost = env.get(Environment.NM_HOST.key)
-      require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
-      val yarnExpiryInterval: FiniteDuration = FiniteDuration(
-        conf.getInt(
-          YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-          YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS),
-      if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
-        log.warn(s"The heartbeat interval of the Flink Application master " +
-          s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " +
-          s"($yarnExpiryInterval). The application is likely to be killed by YARN.")
-      }
-      numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
-      maxFailedContainers = flinkConfiguration.
-        getInteger(ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numTaskManager)
-"Requesting $numTaskManager TaskManagers. Tolerating $maxFailedContainers failed " +
-        "TaskManagers")
-      val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH)
-      val fs = FileSystem.get(conf)
-      val appId = env.get(FlinkYarnClient.ENV_APP_ID)
-      val currDir = env.get(Environment.PWD.key())
-      val clientHomeDir = env.get(FlinkYarnClient.ENV_CLIENT_HOME_DIR)
-      val shipListString = env.get(FlinkYarnClient.ENV_CLIENT_SHIP_FILES)
-      val yarnClientUsername = env.get(FlinkYarnClient.ENV_CLIENT_USERNAME)
-      val rm = AMRMClient.createAMRMClient[ContainerRequest]()
-      rm.init(conf)
-      rm.start()
-      rmClientOption = Some(rm)
-      val nm = NMClient.createNMClient()
-      nm.init(conf)
-      nm.start()
-      nm.cleanupRunningContainersOnStop(true)
-      nmClientOption = Some(nm)
-      // Register with ResourceManager
-      val url = s"http://$applicationMasterHost:$webServerPort"
-"Registering ApplicationMaster with tracking url $url.")
-      rm.registerApplicationMaster(applicationMasterHost, actorSystemPort, url)
-      // Make container requests to ResourceManager
-      for (i <- 0 until numTaskManager) {
-        val containerRequest = getContainerRequest(memoryPerTaskManager)
-"Requesting initial TaskManager container $i.")
-        numPendingRequests += 1
-        // these are initial requests. The reallocation setting doesn't affect this.
-        rm.addContainerRequest(containerRequest)
-      }
-      val flinkJar = Records.newRecord(classOf[LocalResource])
-      val flinkConf = Records.newRecord(classOf[LocalResource])
-      // register Flink Jar with remote HDFS
-      val remoteJarPath = new Path(remoteFlinkJarPath)
-      Utils.registerLocalResource(fs, remoteJarPath, flinkJar)
-      // register conf with local fs
-      Utils.setupLocalResource(conf, fs, appId, new Path(s"file://$currDir/flink-conf-modified" +
-        s".yaml"), flinkConf, new Path(clientHomeDir))
-"Prepared local resource for modified yaml: $flinkConf")
-      val hasLogback = new File(s"$currDir/logback.xml").exists()
-      val hasLog4j = new File(s"$currDir/").exists()
-      // prepare files to be shipped
-      val resources = shipListString.split(",") flatMap {
-        pathStr =>
-          if (pathStr.isEmpty) {
-            None
-          } else {
-            val resource = Records.newRecord(classOf[LocalResource])
-            val path = new Path(pathStr)
-            Utils.registerLocalResource(fs, path, resource)
-            Some((path.getName, resource))
-          }
-      } toList
-      val taskManagerLocalResources = ("flink.jar", flinkJar) ::("flink-conf.yaml",
-        flinkConf) :: resources toMap
-      runningContainers = 0
-      failedContainers = 0
-      val hs = ApplicationMaster.hasStreamingMode(env)
-      containerLaunchContext = Some(
-        createContainerLaunchContext(
-          memoryLimit,
-          hasLogback,
-          hasLog4j,
-          yarnClientUsername,
-          conf,
-          taskManagerLocalResources,
-          hs)
-      )
-      context.system.scheduler.scheduleOnce(
-        self,
-        decorateMessage(HeartbeatWithYarn))
-    } recover {
-      case t: Throwable =>
-        log.error("Could not start yarn session.", t)
-        self ! decorateMessage(
-          StopYarnSession(
-            FinalApplicationStatus.FAILED,
-            s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}")
-        )
-    }
-  }
-  private def tryToReturnContainers(returnRequest: mutable.Set[PreemptionContainer]): Unit = {
-    for(requestedBackContainers <- returnRequest) {
-      allocatedContainersList = allocatedContainersList.dropWhile( container => {
-        val result = requestedBackContainers.getId.equals(container.getId)
-        if(result) {
-"Returning container $container back to ResourceManager.")
-        }
-        result
-      })
-    }
-  }
-  private def getContainerRequest(memoryPerTaskManager: Int): ContainerRequest = {
-    // Priority for worker containers - priorities are intra-application
-    val priority = Records.newRecord(classOf[Priority])
-    priority.setPriority(0)
-    // Resource requirements for worker containers
-    val capability = Records.newRecord(classOf[Resource])
-    capability.setMemory(memoryPerTaskManager)
-    capability.setVirtualCores(1) // hard-code that number (YARN is not accounting for CPUs)
-    new ContainerRequest(capability, null, null, priority)
-  }
-  private def createContainerLaunchContext(
-      memoryLimit: Int,
-      hasLogback: Boolean,
-      hasLog4j: Boolean,
-      yarnClientUsername: String,
-      yarnConf: Configuration,
-      taskManagerLocalResources: Map[String, LocalResource],
-      streamingMode: Boolean)
-    : ContainerLaunchContext = {
-"Create container launch context.")
-    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-    val heapLimit = calculateMemoryLimits(memoryLimit, streamingMode)
-    val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
-    val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
-      s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${memoryLimit}m $javaOpts")
-    if (hasLogback || hasLog4j) {
-      tmCommand ++=
-        s""" -Dlog.file="${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.log""""
-    }
-    if (hasLogback) {
-      tmCommand ++= s" -Dlogback.configurationFile=file:logback.xml"
-    }
-    if (hasLog4j) {
-      tmCommand ++= s""
-    }
-    tmCommand ++= s" ${classOf[YarnTaskManagerRunner].getName} --configDir . 1> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
-    tmCommand ++= " --streamingMode"
-    if(streamingMode) {
-      tmCommand ++= " streaming"
-    } else {
-      tmCommand ++= " batch"
-    }
-    ctx.setCommands(Collections.singletonList(tmCommand.toString()))
-"Starting TM with command=${tmCommand.toString()}")
-    ctx.setLocalResources(taskManagerLocalResources.asJava)
-    // Setup classpath for container ( = TaskManager )
-    val containerEnv = new java.util.HashMap[String, String]()
-    Utils.setupEnv(yarnConf, containerEnv)
-    containerEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, yarnClientUsername)
-    ctx.setEnvironment(containerEnv)
-    val user = UserGroupInformation.getCurrentUser
-    try {
-      val credentials = user.getCredentials
-      val dob = new DataOutputBuffer()
-      credentials.writeTokenStorageToStream(dob)
-      val securityTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-      ctx.setTokens(securityTokens)
-    } catch {
-      case t: Throwable =>
-        log.error("Getting current user info failed when trying to launch the container", t)
-    }
-    ctx
-  }
-  /**
-   * Calculate the correct JVM heap memory limit.
-   * @param memoryLimit The maximum memory in megabytes.
-   * @param streamingMode True if this is a streaming cluster.
-   * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
-   */
-  private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): Long = {
-    val useOffHeap = flinkConfiguration.getBoolean(
-      ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
-    if (useOffHeap && !streamingMode){
-      val fixedOffHeapSize = flinkConfiguration.getLong(
-        ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
-      if (fixedOffHeapSize > 0) {
-        memoryLimit - fixedOffHeapSize
-      } else {
-        val fraction = flinkConfiguration.getFloat(
-          ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-        val offHeapSize = (fraction * memoryLimit).toLong
-        memoryLimit - offHeapSize
-      }
-    } else {
-      memoryLimit
-    }
-  }
+import{FileWriter, BufferedWriter, PrintWriter}
+import org.apache.flink.client.CliFrontend
+import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, JobManager}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.flink.runtime.webmonitor.WebMonitor
+import org.apache.flink.yarn.YarnMessages.StartYarnSession
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.slf4j.LoggerFactory
+/** Base class for all application masters. This base class provides functionality to start a
+  * [[JobManager]] implementation in a Yarn container.
+  *
+  * The only functions which have to be overwritten are the getJobManagerClass and
+  * getArchivistClass, which define the actors to be started.
+  *
+  */
+abstract class ApplicationMasterBase {
+  import scala.collection.JavaConverters._
+  val log = LoggerFactory.getLogger(getClass)
+  val CONF_FILE = "flink-conf.yaml"
+  val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
+  val MAX_REGISTRATION_DURATION = "5 minutes"
+  def getJobManagerClass: Class[_ <: JobManager]
+  def getArchivistClass: Class[_ <: MemoryArchivist]
+  def run(args: Array[String]): Unit = {
+    val yarnClientUsername = System.getenv(FlinkYarnClientBase.ENV_CLIENT_USERNAME)
+"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " +
+      s"setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}")
+    EnvironmentInformation.logEnvironmentInfo(log, "YARN ApplicationMaster/JobManager", args)
+    EnvironmentInformation.checkJavaVersion()
+    org.apache.flink.runtime.util.SignalHandler.register(log)
+    val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
+    for(token <- UserGroupInformation.getCurrentUser.getTokens.asScala){
+      ugi.addToken(token)
+    }
+    ugi.doAs(new PrivilegedAction[Object] {
+      override def run(): Object = {
+        runAction()
+        null
+      }
+    })
+  }
+  def runAction(): Unit = {
+    var webMonitorOption: Option[WebMonitor] = None
+    var actorSystemOption: Option[ActorSystem] = None
+    try {
+      val env = System.getenv()
+      if (log.isDebugEnabled) {
+        log.debug("All environment variables: " + env.toString)
+      }
+      val currDir = env.get(Environment.PWD.key())
+      require(currDir != null, "Current directory unknown.")
+      val logDirs = env.get(Environment.LOG_DIRS.key())
+      val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) {
+"Starting ApplicationMaster/JobManager in streaming mode")
+        StreamingMode.STREAMING
+      } else {
+"Starting ApplicationMaster/JobManager in batch only mode")
+        StreamingMode.BATCH_ONLY
+      }
+      // Note that we use the "ownHostname" given by YARN here, to make sure
+      // we use the hostnames given by YARN consistently throughout akka.
+      // for akka "localhost" and "localhost.localdomain" are different actors.
+      val ownHostname = env.get(Environment.NM_HOST.key())
+      require(ownHostname != null, "Own hostname in YARN not set.")
+      log.debug("Yarn assigned hostname for application master {}.", ownHostname)
+      val taskManagerCount = env.get(FlinkYarnClientBase.ENV_TM_COUNT).toInt
+      val slots = env.get(FlinkYarnClientBase.ENV_SLOTS).toInt
+      val dynamicPropertiesEncodedString = env.get(FlinkYarnClientBase.ENV_DYNAMIC_PROPERTIES)
+      val config = createConfiguration(currDir, dynamicPropertiesEncodedString)
+      // if a web monitor shall be started, set the port to random binding
+      if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
+        config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
+        config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
+      }
+      val (actorSystem, jmActor, archivActor, webMonitor) =
+        JobManager.startActorSystemAndJobManagerActors(
+          config,
+          JobManagerMode.CLUSTER,
+          streamingMode,
+          ownHostname,
+          0,
+          getJobManagerClass,
+          getArchivistClass
+        )
+      actorSystemOption = Option(actorSystem)
+      webMonitorOption = webMonitor
+      val address = AkkaUtils.getAddress(actorSystem)
+      val jobManagerPort = address.port.get
+      val akkaHostname =
+      log.debug("Actor system bound hostname {}.", akkaHostname)
+      val webServerPort =
+      // generate configuration file for TaskManagers
+      generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, akkaHostname,
+        jobManagerPort, webServerPort, logDirs, slots, taskManagerCount,
+        dynamicPropertiesEncodedString)
+      val hadoopConfig = new YarnConfiguration();
+      // send "start yarn session" message to YarnJobManager.
+"Starting YARN session on Job Manager.")
+      jmActor ! StartYarnSession(hadoopConfig, webServerPort)
+"Application Master properly initiated. Awaiting termination of actor system.")
+      actorSystem.awaitTermination()
+    }
+    catch {
+      case t: Throwable =>
+        log.error("Error while running the application master.", t)
+        actorSystemOption.foreach {
+          actorSystem =>
+            actorSystem.shutdown()
+            actorSystem.awaitTermination()
+        }
+    }
+    finally {
+      webMonitorOption.foreach {
+        webMonitor =>
+          log.debug("Stopping Job Manager web frontend.")
+          webMonitor.stop()
+      }
+    }
+  }
+  def generateConfigurationFile(
+    fileName: String,
+    currDir: String,
+    ownHostname: String,
+    jobManagerPort: Int,
+    jobManagerWebPort: Int,
+    logDirs: String,
+    slots: Int,
+    taskManagerCount: Int,
+    dynamicPropertiesEncodedString: String)
+  : Unit = {
+"Generate configuration file for application master.")
+    val output = new PrintWriter(new BufferedWriter(
+      new FileWriter(fileName))
+    )
+    for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if !(line.contains
+      (ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) {
+      output.println(line)
+    }
+    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname")
+    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort")
+    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs")
+    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort")
+    if(slots != -1){
+      output.println(s"${ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS}: $slots")
+      output.println(
+        s"${ConfigConstants.DEFAULT_PARALLELISM_KEY}: ${slots*taskManagerCount}")
+    }
+    output.println(s"${ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION}: " +
+    // add dynamic properties
+    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+    import scala.collection.JavaConverters._
+    for(property <- dynamicProperties.asScala){
+      output.println(s"${property.f0}: ${property.f1}")
+    }
+    output.close()
+  }
+  def createConfiguration(curDir: String, dynamicPropertiesEncodedString: String): Configuration = {
+"Loading config from: $curDir.")
+    GlobalConfiguration.loadConfiguration(curDir)
+    val configuration = GlobalConfiguration.getConfiguration()
+    configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, curDir)
+    // add dynamic properties to JobManager configuration.
+    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
+    import scala.collection.JavaConverters._
+    for(property <- dynamicProperties.asScala){
+      configuration.setString(property.f0, property.f1)
+    }
+    configuration
+  }
+object ApplicationMasterBase {
+  def hasStreamingMode(env: java.util.Map[String, String]): Boolean = {
+    val sModeString = env.get(FlinkYarnClientBase.ENV_STREAMING_MODE)
+    if(sModeString != null) {
+      return sModeString.toBoolean
+    }
+    false
+  }
-import java.util.Date
-import org.apache.flink.api.common.JobID
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-object Messages {
-  case class YarnMessage(message: String, date: Date = new Date())
-  case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int)
-  case class RegisterClient(client: ActorRef)
-  case object UnregisterClient
-  case class StopYarnSession(status: FinalApplicationStatus, diagnostics: String)
-  case object JobManagerStopped
-  case class StartYarnSession(
-      configuration: Configuration,
-      actorSystemPort: Int,
-      webServerport: Int)
-  case class JobManagerActorRef(jobManager: ActorRef)
-  case object HeartbeatWithYarn
-  case object PollYarnClusterStatus // see org.apache.flink.runtime.yarn.FlinkYarnClusterStatus for
-                                    // the response
-  case object CheckForUserCommand
-  case class StopAMAfterJob(jobId:JobID) // tell the AM to monitor the job and stop once it has
-    // finished.
-  // Client-local messages
-  case class LocalRegisterClient(jobManagerAddress: InetSocketAddress)
-  case object LocalUnregisterClient
-  case object LocalGetYarnMessage // request new message
-  case object LocalGetYarnClusterStatus // request the latest cluster status
-  def getLocalGetYarnMessage(): AnyRef = {
-    LocalGetYarnMessage
-  }
-  def getLocalUnregisterClient(): AnyRef = {
-    LocalUnregisterClient
-  }
+import java.lang.reflect.Method
+import java.nio.ByteBuffer
+import java.util.Collections
+import java.util.{List => JavaList}
+import grizzled.slf4j.Logger
+import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants}
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus,
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
+import org.apache.flink.yarn.YarnMessages._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{NMClient, AMRMClient}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.hadoop.yarn.util.Records
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Try
+/** JobManager actor for execution on Yarn. It enriches the [[JobManager]] with additional messages
+  * to start/administer/stop the Yarn session.
+  *
+  * @param flinkConfiguration Configuration object for the actor
+  * @param executionContext Execution context which is used to execute concurrent tasks in the
+  *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param instanceManager Instance manager to manage the registered
+  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
+  * @param scheduler Scheduler to schedule Flink jobs
+  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param archive Archive for finished Flink jobs
+  * @param defaultExecutionRetries Number of default execution retries
+  * @param delayBetweenRetries Delay between retries
+  * @param timeout Timeout for futures
+  * @param mode StreamingMode in which the system shall be started
+  * @param leaderElectionService LeaderElectionService to participate in the leader election
+  */
+class YarnJobManager(
+    flinkConfiguration: FlinkConfiguration,
+    executionContext: ExecutionContext,
+    instanceManager: InstanceManager,
+    scheduler: FlinkScheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    defaultExecutionRetries: Int,
+    delayBetweenRetries: Long,
+    timeout: FiniteDuration,
+    mode: StreamingMode,
+    leaderElectionService: LeaderElectionService)
+  extends JobManager(
+    flinkConfiguration,
+    executionContext,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    defaultExecutionRetries,
+    delayBetweenRetries,
+    timeout,
+    mode,
+    leaderElectionService) {
+  import context._
+  import scala.collection.JavaConverters._
+  val FAST_YARN_HEARTBEAT_DELAY: FiniteDuration = 500 milliseconds
+  val DEFAULT_YARN_HEARTBEAT_DELAY: FiniteDuration = 5 seconds
+  val YARN_HEARTBEAT_DELAY: FiniteDuration =
+    if(flinkConfiguration.getString(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, null) == null) {
+    } else {
+      FiniteDuration(
+        flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5), SECONDS)
+    }
+  val taskManagerRunnerClass: Class[_] = classOf[YarnTaskManagerRunner]
+  private def env = System.getenv()
+  // indicates if this AM has been started in a detached mode.
+  val detached = java.lang.Boolean.valueOf(env.get(FlinkYarnClientBase.ENV_DETACHED))
+  var stopWhenJobFinished: JobID = null
+  var rmClientOption: Option[AMRMClient[ContainerRequest]] = None
+  var nmClientOption: Option[NMClient] = None
+  var messageListener:Option[ActorRef] = None
+  var containerLaunchContext: Option[ContainerLaunchContext] = None
+  var runningContainers = 0 // number of currently running containers
+  var failedContainers = 0 // failed container count
+  var numTaskManagers = 0 // the requested number of TMs
+  var maxFailedContainers = 0
+  var numPendingRequests = 0 // number of currently pending container allocation requests.
+  var memoryPerTaskManager = 0
+  // list of containers available for starting
+  var allocatedContainersList = List[Container]()
+  var runningContainersList = List[Container]()
+  override def handleMessage: Receive = {
+    handleYarnMessage orElse super.handleMessage
+  }
+  def handleYarnMessage: Receive = {
+    case StopYarnSession(status, diag) =>
+"Stopping YARN JobManager with status $status and diagnostic $diag.")
+      instanceManager.getAllRegisteredInstances.asScala foreach {
+        instance =>
+          instance.getActorGateway.tell(StopYarnSession(status, diag))
+      }
+      rmClientOption foreach {
+        rmClient =>
+          Try(rmClient.unregisterApplicationMaster(status, diag, "")).recover{
+            case t: Throwable => log.error("Could not unregister the application master.", t)
+          }
+          Try(rmClient.close()).recover{
+            case t:Throwable => log.error("Could not close the AMRMClient.", t)
+          }
+      }
+      rmClientOption = None
+      nmClientOption foreach {
+        nmClient =>
+        Try(nmClient.close()).recover{
+          case t: Throwable => log.error("Could not close the NMClient.", t)
+        }
+      }
+      nmClientOption = None
+      messageListener foreach {
+          _ ! decorateMessage(JobManagerStopped)
+      }
+      context.system.shutdown()
+    case RegisterApplicationClient =>
+      val client = sender()
+"Register ${client.path} as client.")
+      messageListener = Some(client)
+      sender ! decorateMessage(AcknowledgeApplicationClientRegistration)
+    case UnregisterClient =>
+      messageListener = None
+    case msg: StopAMAfterJob =>
+      val jobId = msg.jobId
+"ApplicatonMaster will shut down YARN session when job $jobId has finished.")
+      stopWhenJobFinished = jobId
+      sender() ! decorateMessage(Acknowledge)
+    case PollYarnClusterStatus =>
+      sender() ! decorateMessage(
+        new FlinkYarnClusterStatus(
+          instanceManager.getNumberOfRegisteredTaskManagers,
+          instanceManager.getTotalNumberOfSlots)
+      )
+    case StartYarnSession(hadoopConfig, webServerPort) =>
+      startYarnSession(hadoopConfig, webServerPort)
+    case jnf: JobNotFound =>
+      log.warn(s"Job with ID ${jnf.jobID} not found in JobManager")
+      if(stopWhenJobFinished == null) {
+        log.warn("The ApplicationMaster didn't expect to receive this message")
+      }
+    case jobStatus: CurrentJobStatus =>
+      if(stopWhenJobFinished == null) {
+        log.warn(s"Received job status $jobStatus which wasn't requested.")
+      } else {
+        if(stopWhenJobFinished != jobStatus.jobID) {
+          log.warn(s"Received job status for job ${jobStatus.jobID} but expected status for " +
+            s"job $stopWhenJobFinished")
+        } else {
+          if(jobStatus.status.isTerminalState) {
+  "Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " +
+              s"Shutting down YARN session")
+            if (jobStatus.status == JobStatus.FINISHED) {
+              self ! decorateMessage(
+                StopYarnSession(
+                  FinalApplicationStatus.SUCCEEDED,
+                  s"The monitored job with ID ${jobStatus.jobID} has finished.")
+              )
+            } else {
+              self ! decorateMessage(
+                StopYarnSession(
+                  FinalApplicationStatus.FAILED,
+                  s"The monitored job with ID ${jobStatus.jobID} has failed to complete.")
+              )
+            }
+          } else {
+            log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}")
+          }
+        }
+      }
+    case HeartbeatWithYarn =>
+      // piggyback on the YARN heartbeat to check if the job has finished
+      if(stopWhenJobFinished != null) {
+        self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
+      }
+      rmClientOption match {
+        case Some(rmClient) =>
+          log.debug("Send heartbeat to YARN")
+          val response = rmClient.allocate(runningContainers.toFloat / numTaskManagers)
+          // ---------------------------- handle YARN responses -------------
+          val newlyAllocatedContainers = response.getAllocatedContainers.asScala
+          newlyAllocatedContainers.foreach {
+            container =>"Got new container for allocation: ${container.getId}")
+          }
+          allocatedContainersList ++= newlyAllocatedContainers
+          numPendingRequests = math.max(0, numPendingRequests - newlyAllocatedContainers.length)
+          val completedContainerStatuses = response.getCompletedContainersStatuses.asScala
+          val idStatusMap = completedContainerStatuses
+            .map(status => (status.getContainerId, status)).toMap
+          completedContainerStatuses.foreach {
+            status =>"Container ${status.getContainerId} is completed " +
+              s"with diagnostics: ${status.getDiagnostics}")
+          }
+          // get failed containers (returned containers are also completed, so we have to
+          // distinguish if it was running before).
+          val (completedContainers, remainingRunningContainers) = runningContainersList
+            .partition(idStatusMap contains _.getId)
+          completedContainers.foreach {
+            container =>
+              val status = idStatusMap(container.getId)
+              failedContainers += 1
+              runningContainers -= 1
+    "Container ${status.getContainerId} was a running container. " +
+                s"Total failed containers $failedContainers.")
+              val detail = status.getExitStatus match {
+                case -103 => "Vmem limit exceeded";
+                case -104 => "Pmem limit exceeded";
+                case _ => ""
+              }
+              messageListener foreach {
+                _ ! decorateMessage(
+                  YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " +
+                    s"state=${status.getState}.\n${status.getDiagnostics} $detail")
+                )
+              }
+          }
+          runningContainersList = remainingRunningContainers
+          // return containers if the RM wants them and we haven't allocated them yet.
+          val preemptionMessage = response.getPreemptionMessage
+          if(preemptionMessage != null) {
+  "Received preemtion message from YARN $preemptionMessage.")
+            val contract = preemptionMessage.getContract
+            if(contract != null) {
+              tryToReturnContainers(contract.getContainers.asScala.toSet)
+            }
+            val strictContract = preemptionMessage.getStrictContract
+            if(strictContract != null) {
+              tryToReturnContainers(strictContract.getContainers.asScala.toSet)
+            }
+          }
+          // ---------------------------- decide if we need to do anything ---------
+          // check if we want to start some of our allocated containers.
+          if(runningContainers < numTaskManagers) {
+            val missingContainers = numTaskManagers - runningContainers
+  "The user requested $numTaskManagers containers, $runningContainers " +
+              s"running. $missingContainers containers missing")
+            val numStartedContainers = startTMsInAllocatedContainers(missingContainers)
+            // if there are still containers missing, request them from YARN
+            val toAllocateFromYarn = Math.max(
+              missingContainers - numStartedContainers - numPendingRequests,
+              0)
+            if (toAllocateFromYarn > 0) {
+              val reallocate = flinkConfiguration
+                .getBoolean(ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS, true)
+    "There are $missingContainers containers missing." +
+                s" $numPendingRequests are already requested. " +
+                s"Requesting $toAllocateFromYarn additional container(s) from YARN. " +
+                s"Reallocation of failed containers is enabled=$reallocate " +
+                s"('${ConfigConstants.YARN_REALLOCATE_FAILED_CONTAINERS}')")
+              // there are still containers missing. Request them from YARN
+              if (reallocate) {
+                for(i <- 1 to toAllocateFromYarn) {
+                  val containerRequest = getContainerRequest(memoryPerTaskManager)
+                  rmClient.addContainerRequest(containerRequest)
+                  numPendingRequests += 1
+        "Requested additional container from YARN. Pending requests " +
+                    s"$numPendingRequests.")
+                }
+              }
+            }
+          }
+          if(runningContainers >= numTaskManagers && allocatedContainersList.nonEmpty) {
+  "Flink has ${allocatedContainersList.size} allocated containers which " +
+              s"are not needed right now. Returning them")
+            for(container <- allocatedContainersList) {
+              rmClient.releaseAssignedContainer(container.getId)
+            }
+            allocatedContainersList = List()
+          }
+          // maxFailedContainers == -1 is infinite number of retries.
+          if(maxFailedContainers != -1 && failedContainers >= maxFailedContainers) {
+            val msg = s"Stopping YARN session because the number of failed " +
+              s"containers ($failedContainers) exceeded the maximum failed container " +
+              s"count ($maxFailedContainers). This number is controlled by " +
+              s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " +
+              s"setting. By default its the number of requested containers"
+            log.error(msg)
+            self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
+          }
+          // schedule next heartbeat:
+          if (runningContainers < numTaskManagers) {
+            // we don't have the requested number of containers. Do fast polling
+            context.system.scheduler.scheduleOnce(
+              self,
+              decorateMessage(HeartbeatWithYarn))
+          } else {
+            // everything is good, slow down polling
+            context.system.scheduler.scheduleOnce(
+              YARN_HEARTBEAT_DELAY,
+              self,
+              decorateMessage(HeartbeatWithYarn))
+          }
+        case None =>
+          log.error("The AMRMClient was not set.")
+          self ! decorateMessage(
+            StopYarnSession(
+              FinalApplicationStatus.FAILED,
+              "Fatal error in AM: AMRMClient was not set")
+          )
+      }
+      log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " +
+        s"failed containers $failedContainers, " +
+        s"allocated containers ${allocatedContainersList.size}.")
+  }
+  /** Starts min(numTMsToStart, allocatedContainersList.size) TaskManager in the available
+    * allocated containers. The number of successfully started TaskManagers is returned.
+    *
+    * @param numTMsToStart Number of TaskManagers to start if enough allocated containers are
+    *                      available. If not, then all allocated containers are used
+    * @return Number of successfully started TaskManagers
+    */
+  private def startTMsInAllocatedContainers(numTMsToStart: Int): Int = {
+    // not enough containers running
+    if (allocatedContainersList.nonEmpty) {
+"${allocatedContainersList.size} containers already allocated by YARN. " +
+        "Starting...")
+      nmClientOption match {
+        case Some(nmClient) =>
+          containerLaunchContext match {
+            case Some(ctx) =>
+              val (containersToBeStarted, remainingContainers) = allocatedContainersList
+                .splitAt(numTMsToStart)
+              val startedContainers = containersToBeStarted.flatMap {
+                container =>
+                  try {
+                    nmClient.startContainer(container, ctx)
+                    val message = s"Launching container (${container.getId} on host " +
+                      s"${container.getNodeId.getHost})."
+                    messageListener foreach {
+                      _ ! decorateMessage(YarnMessage(message))
+                    }
+                    Some(container)
+                  } catch {
+                    case e: YarnException =>
+                      log.error(s"Exception while starting YARN " +
+                        s"container ${container.getId} on " +
+                        s"host ${container.getNodeId.getHost}", e)
+                      None
+                  }
+              }
+              runningContainers += startedContainers.length
+              runningContainersList :::= startedContainers
+              allocatedContainersList = remainingContainers
+              startedContainers.length
+            case None =>
+              log.error("The ContainerLaunchContext was not set.")
+              self ! decorateMessage(
+                StopYarnSession(
+                  FinalApplicationStatus.FAILED,
+                  "Fatal error in AM: The ContainerLaunchContext was not set."))
+              0
+          }
+        case None =>
+          log.error("The NMClient was not set.")
+          self ! decorateMessage(
+            StopYarnSession(
+              FinalApplicationStatus.FAILED,
+              "Fatal error in AM: The NMClient was not set."))
+          0
+      }
+    } else {
+      0
+    }
+  }
+  private def runningContainerIds(): List[ContainerId] = {
+    runningContainersList map { runningCont => runningCont.getId}
+  }
+  private def allocatedContainerIds(): List[ContainerId] = {
+    allocatedContainersList map { runningCont => runningCont.getId}
+  }
+  /** Starts the Yarn session by connecting to the RessourceManager and the NodeManager. After
+    * a connection has been established, the number of missing containers is requested from Yarn.
+    *
+    * @param conf Hadoop configuration object
+    * @param webServerPort The port on which the web server is listening
+    */
+  private def startYarnSession(conf: Configuration, webServerPort: Int): Unit = {
+    Try {
+"Start yarn session.")
+      memoryPerTaskManager = env.get(FlinkYarnClientBase.ENV_TM_MEMORY).toInt
+      val memoryLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)
+      val applicationMasterHost = env.get(Environment.NM_HOST.key)
+      require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
+      val yarnExpiryInterval: FiniteDuration = FiniteDuration(
+        conf.getInt(
+          YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS),
+      if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
+        log.warn(s"The heartbeat interval of the Flink Application master " +
+          s"($YARN_HEARTBEAT_DELAY) is greater than YARN's expiry interval " +
+          s"($yarnExpiryInterval). The application is likely to be killed by YARN.")
+      }
+      numTaskManagers = env.get(FlinkYarnClientBase.ENV_TM_COUNT).toInt
+      maxFailedContainers = flinkConfiguration.
+        getInteger(ConfigConstants.YARN_MAX_FAILED_CONTAINERS, numTaskManagers)
+"Yarn session with $numTaskManagers TaskManagers. Tolerating " +
+        s"$maxFailedContainers failed TaskManagers")
+      val remoteFlinkJarPath = env.get(FlinkYarnClientBase.FLINK_JAR_PATH)
+      val fs = FileSystem.get(conf)
+      val appId = env.get(FlinkYarnClientBase.ENV_APP_ID)
+      val currDir = env.get(Environment.PWD.key())
+      val clientHomeDir = env.get(FlinkYarnClientBase.ENV_CLIENT_HOME_DIR)
+      val shipListString = env.get(FlinkYarnClientBase.ENV_CLIENT_SHIP_FILES)
+      val yarnClientUsername = env.get(FlinkYarnClientBase.ENV_CLIENT_USERNAME)
+      val rm = AMRMClient.createAMRMClient[ContainerRequest]()
+      rm.init(conf)
+      rm.start()
+      rmClientOption = Some(rm)
+      val nm = NMClient.createNMClient()
+      nm.init(conf)
+      nm.start()
+      nm.cleanupRunningContainersOnStop(true)
+      nmClientOption = Some(nm)
+      // Register with ResourceManager
+      val url = s"http://$applicationMasterHost:$webServerPort"
+"Registering ApplicationMaster with tracking url $url.")
+      val actorSystemPort = AkkaUtils.getAddress(system).port.getOrElse(-1)
+      val response = rm.registerApplicationMaster(
+        applicationMasterHost,
+        actorSystemPort,
+        url)
+      val containersFromPreviousAttempts = getContainersFromPreviousAttempts(response)
+"Retrieved ${containersFromPreviousAttempts.length} TaskManagers from previous " +
+        s"attempts.")
+      runningContainersList ++= containersFromPreviousAttempts
+      runningContainers = runningContainersList.length
+      // Make missing container requests to ResourceManager
+      runningContainers until numTaskManagers foreach {
+        i =>
+          val containerRequest = getContainerRequest(memoryPerTaskManager)
+"Requesting initial TaskManager container $i.")
+          numPendingRequests += 1
+          // these are initial requests. The reallocation setting doesn't affect this.
+          rm.addContainerRequest(containerRequest)
+      }
+      val flinkJar = Records.newRecord(classOf[LocalResource])
+      val flinkConf = Records.newRecord(classOf[LocalResource])
+      // register Flink Jar with remote HDFS
+      val remoteJarPath = new Path(remoteFlinkJarPath)
+      Utils.registerLocalResource(fs, remoteJarPath, flinkJar)
+      // register conf with local fs
+      Utils.setupLocalResource(conf, fs, appId, new Path(s"file://$currDir/flink-conf-modified" +
+        s".yaml"), flinkConf, new Path(clientHomeDir))
+"Prepared local resource for modified yaml: $flinkConf")
+      val hasLogback = new File(s"$currDir/logback.xml").exists()
+      val hasLog4j = new File(s"$currDir/").exists()
+      // prepare files to be shipped
+      val resources = shipListString.split(",") flatMap {
+        pathStr =>
+          if (pathStr.isEmpty) {
+            None
+          } else {
+            val resource = Records.newRecord(classOf[LocalResource])
+            val path = new Path(pathStr)
+            Utils.registerLocalResource(fs, path, resource)
+            Some((path.getName, resource))
+          }
+      } toList
+      val taskManagerLocalResources = ("flink.jar", flinkJar) ::("flink-conf.yaml",
+        flinkConf) :: resources toMap
+      failedContainers = 0
+      containerLaunchContext = Some(
+        createContainerLaunchContext(
+          memoryLimit,
+          hasLogback,
+          hasLog4j,
+          yarnClientUsername,
+          conf,
+          taskManagerLocalResources,
+          ApplicationMasterBase.hasStreamingMode(env))
+      )
+      context.system.scheduler.scheduleOnce(
+        self,
+        decorateMessage(HeartbeatWithYarn))
+    } recover {
+      case t: Throwable =>
+        log.error("Could not start yarn session.", t)
+        self ! decorateMessage(
+          StopYarnSession(
+            FinalApplicationStatus.FAILED,
+            s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}")
+        )
+    }
+  }
+  /** Returns all still living containers from previous application attempts.
+    *
+    * @param response RegisterApplicationMasterResponse which contains the information about
+    *                 living containers.
+    * @return Seq of living containers which could be retrieved
+    */
+  private def getContainersFromPreviousAttempts(
+      response: RegisterApplicationMasterResponse)
+    : Seq[Container] = {
+    RegisterApplicationMasterResponseReflector.getContainersFromPreviousAttempts(response)
+  }
+  private def tryToReturnContainers(returnRequest: Set[PreemptionContainer]): Unit = {
+    for(requestedBackContainers <- returnRequest) {
+      allocatedContainersList = allocatedContainersList.dropWhile( container => {
+        val result = requestedBackContainers.getId.equals(container.getId)
+        if(result) {
+"Returning container $container back to ResourceManager.")
+        }
+        result
+      })
+    }
+  }
+  private def getContainerRequest(memoryPerTaskManager: Int): ContainerRequest = {
+    // Priority for worker containers - priorities are intra-application
+    val priority = Records.newRecord(classOf[Priority])
+    priority.setPriority(0)
+    // Resource requirements for worker containers
+    val capability = Records.newRecord(classOf[Resource])
+    capability.setMemory(memoryPerTaskManager)
+    capability.setVirtualCores(1) // hard-code that number (YARN is not accounting for CPUs)
+    new ContainerRequest(capability, null, null, priority)
+  }
+  private def createContainerLaunchContext(
+      memoryLimit: Int,
+      hasLogback: Boolean,
+      hasLog4j: Boolean,
+      yarnClientUsername: String,
+      yarnConf: Configuration,
+      taskManagerLocalResources: Map[String, LocalResource],
+      streamingMode: Boolean)
+    : ContainerLaunchContext = {
+"Create container launch context.")
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+    val heapLimit = calculateMemoryLimits(memoryLimit, streamingMode)
+    val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
+    val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
+      s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${memoryLimit}m $javaOpts")
+    if (hasLogback || hasLog4j) {
+      tmCommand ++=
+        s""" -Dlog.file="${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.log""""
+    }
+    if (hasLogback) {
+      tmCommand ++= s" -Dlogback.configurationFile=file:logback.xml"
+    }
+    if (hasLog4j) {
+      tmCommand ++= s""
+    }
+    tmCommand ++= s" ${taskManagerRunnerClass.getName} --configDir . 1> " +
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " +
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
+    tmCommand ++= " --streamingMode"
+    if(streamingMode) {
+      tmCommand ++= " streaming"
+    } else {
+      tmCommand ++= " batch"
+    }
+    ctx.setCommands(Collections.singletonList(tmCommand.toString()))
+"Starting TM with command=${tmCommand.toString()}")
+    ctx.setLocalResources(taskManagerLocalResources.asJava)
+    // Setup classpath for container ( = TaskManager )
+    val containerEnv = new java.util.HashMap[String, String]()
+    Utils.setupEnv(yarnConf, containerEnv)
+    containerEnv.put(FlinkYarnClientBase.ENV_CLIENT_USERNAME, yarnClientUsername)
+    ctx.setEnvironment(containerEnv)
+    val user = UserGroupInformation.getCurrentUser
+    try {
+      val credentials = user.getCredentials
+      val dob = new DataOutputBuffer()
+      credentials.writeTokenStorageToStream(dob)
+      val securityTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+      ctx.setTokens(securityTokens)
+    } catch {
+      case t: Throwable =>
+        log.error("Getting current user info failed when trying to launch the container", t)
+    }
+    ctx
+  }
+  /**
+   * Calculate the correct JVM heap memory limit.
+   * @param memoryLimit The maximum memory in megabytes.
+   * @param streamingMode True if this is a streaming cluster.
+   * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
+   */
+  private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): Long = {
+    val useOffHeap = flinkConfiguration.getBoolean(
+      ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
+    if (useOffHeap && !streamingMode){
+      val fixedOffHeapSize = flinkConfiguration.getLong(
+        ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
+      if (fixedOffHeapSize > 0) {
+        memoryLimit - fixedOffHeapSize
+      } else {
+        val fraction = flinkConfiguration.getFloat(
+          ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+        val offHeapSize = (fraction * memoryLimit).toLong
+        memoryLimit - offHeapSize
+      }
+    } else {
+      memoryLimit
+    }
+  }
+/** Singleton object to reflect the getContainersFromPreviousAttempts method for
+  * [[RegisterApplicationMasterResponse]].
+  *
+  * Only Hadoop versions (>= 2.4.0) support
+  * [[RegisterApplicationMasterResponse#getContainersFromPreviousAttempts]]. Therefore, it's checked
+  * at runtime whether the RegisterApplicationMasterResponse supports this method. If not, then an
+  * empty Seq[Container] is returned upon calling getContainersFromPreviousAttempts.
+  *
+  */
+private object RegisterApplicationMasterResponseReflector {
+  val log = Logger(getClass)
+  // Use reflection to find the method
+  val methodOption: Option[Method] = Try {
+    classOf[RegisterApplicationMasterResponse].getMethod("getContainersFromPreviousAttempts")
+  }.toOption
+  def getContainersFromPreviousAttempts(
+      response: RegisterApplicationMasterResponse)
+    : Seq[Container] = {
+    import scala.collection.JavaConverters._
+    // if the method was defined call it, if not, then return an empty List
+    methodOption match {
+      case Some(method) =>
+        log.debug(s"Calling method ${method.getName} of ${response.getClass.getCanonicalName}.")
+        method.invoke(response).asInstanceOf[JavaList[Container]].asScala
+      case None =>
+        log.debug(s"${response.getClass.getCanonicalName} does not support the method " +
+          "getContainersFromPreviousAttempts. Returning empty list.")
+        List()
+    }
+  }
+import java.util.{UUID, Date}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import scala.concurrent.duration.{Deadline, FiniteDuration}
+object YarnMessages {
+  case class YarnMessage(message: String, date: Date = new Date())
+  case class ApplicationMasterStatus(numTaskManagers: Int, numSlots: Int)
+  case object UnregisterClient extends RequiresLeaderSessionID
+  case class StopYarnSession(status: FinalApplicationStatus, diagnostics: String)
+    extends RequiresLeaderSessionID
+  case class LocalStopYarnSession(status: FinalApplicationStatus, diagnostics: String)
+  case object JobManagerStopped
+  case class StartYarnSession(config: Configuration, webServerPort: Int)
+  /** Triggers the registration of the ApplicationClient to the YarnJobManager
+    *
+    * @param jobManagerAkkaURL JobManager's Akka URL
+    * @param currentTimeout Timeout for next [[TriggerApplicationClientRegistration]] message
+    * @param deadline Deadline for registration process to finish
+    */
+  case class TriggerApplicationClientRegistration(
+      jobManagerAkkaURL: String,
+      currentTimeout: FiniteDuration,
+      deadline: Option[Deadline]) extends RequiresLeaderSessionID
+  /** Registration message sent from the [[ApplicationClient]] to the [[YarnJobManager]]. A
+    * succesful registration is acknowledged with a [[AcknowledgeApplicationClientRegistration]]
+    * message.
+    */
+  case object RegisterApplicationClient extends RequiresLeaderSessionID
+  /** Response to a [[RegisterApplicationClient]] message which led to a successful registration
+    * of the [[ApplicationClient]]
+    */
+  case object AcknowledgeApplicationClientRegistration extends RequiresLeaderSessionID
+  /** Notification message that a new leader has been found. This message is sent from the
+    * [[org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService]]
+    *
+    * @param jobManagerAkkaURL New leader's Akka URL
+    * @param leaderSessionID New leader's session ID
+    */
+  case class JobManagerLeaderAddress(jobManagerAkkaURL: String, leaderSessionID: UUID)
+  case object HeartbeatWithYarn
+  case object PollYarnClusterStatus // see org.apache.flink.runtime.yarn.FlinkYarnClusterStatus for
+                                    // the response
+  case object CheckForUserCommand
+  // tell the AM to monitor the job and stop once it has finished
+  case class StopAMAfterJob(jobId:JobID) extends RequiresLeaderSessionID
+  case class LocalStopAMAfterJob(jobId:JobID)
+  case object LocalGetYarnMessage // request new message
+  case object LocalGetYarnClusterStatus // request the latest cluster status
+  def getLocalGetYarnMessage(): AnyRef = {
+    LocalGetYarnMessage
+  }
+  def getLocalGetyarnClusterStatus(): AnyRef = {
+    LocalGetYarnClusterStatus
+  }
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager}
-import org.apache.flink.yarn.Messages.StopYarnSession
+import org.apache.flink.yarn.YarnMessages.StopYarnSession
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-public class FlinkYarnSessionCliTest {
-	@Rule
-	public TemporaryFolder tmp = new TemporaryFolder();
-	@Test
-	public void testDynamicProperties() throws IOException {
-		Map<String, String> map = new HashMap<String, String>(System.getenv());
-		File tmpFolder = tmp.newFolder();
-		File fakeConf = new File(tmpFolder, "flink-conf.yaml");
-		fakeConf.createNewFile();
-		map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
-		TestBaseUtils.setEnv(map);
-		Options options = new Options();
-		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
-		cli.getYARNSessionCLIOptions(options);
-		CommandLineParser parser = new PosixParser();
-		CommandLine cmd = null;
-		try {
-			cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
-		} catch(Exception e) {
-			e.printStackTrace();
-"Parsing failed with " + e.getMessage());
-		}
-		AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd);
-		Assert.assertNotNull(flinkYarnClient);
-		List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
-		Assert.assertEquals(1, dynProperties.size());
-		Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0);
-		Assert.assertEquals("5 min", dynProperties.get(0).f1);
-	}
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-public class UtilsTests {
-	/**
-	 * Remove 15% of the heap, at least 384MB.
-	 *
-	 */
-	@Test
-	public void testHeapCutoff() {
-		Configuration conf = new Configuration();
-		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
-		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
-		// test different configuration
-		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf) );
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
-		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.5");
-		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-	}
-	@Test(expected = IllegalArgumentException.class)
-	public void illegalArgument() {
-		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1.1");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-	}
-	@Test(expected = IllegalArgumentException.class)
-	public void illegalArgumentNegative() {
-		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "-0.01");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-	}
-	@Test(expected = IllegalArgumentException.class)
-	public void tooMuchCutoff() {
-		Configuration conf = new Configuration();
-		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "6000");
-		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
-	}

       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
-    val (executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      _,
-      executionRetries,
-      delayBetweenRetries,
-      timeout,
-      archiveCount,
-      leaderElectionService) = JobManager.createJobManagerComponents(config)
-      val testArchiveProps = Props(
-        new TestingMemoryArchivist(archiveCount))
-    val archiver = actorSystem.actorOf(testArchiveProps, archiveName)
-    val jobManagerProps = Props(
-      new TestingJobManager(
-        configuration,
-        executionContext,
-        instanceManager,
-        scheduler,
-        libraryCacheManager,
-        archiver,
-        executionRetries,
-        delayBetweenRetries,
-        timeout,
-        streamingMode,
-        leaderElectionService))
-    val jobManager = actorSystem.actorOf(jobManagerProps, jobManagerName)
+    val (jobManager, _) = JobManager.startJobManagerActors(
+      config,
+      actorSystem,
+      Some(jobManagerName),
+      Some(archiveName),
+      streamingMode,
+      classOf[TestingJobManager],
+      classOf[TestingMemoryArchivist])
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -125,7 +126,12 @@ public abstract class AbstractProcessFailureRecoveryTest extends TestLogger {
 			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.STREAMING)._1();
+			ActorRef jmActor = JobManager.startJobManagerActors(
+				jmConfig,
+				jmActorSystem,
+				StreamingMode.STREAMING,
+				JobManager.class,
+				MemoryArchivist.class)._1();
 			// the TaskManager java command
 			String[] command = new String[] {
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
@@ -100,7 +101,12 @@ public class ProcessFailureCancelingITCase {
 			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s");
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.BATCH_ONLY)._1();
+			ActorRef jmActor = JobManager.startJobManagerActors(
+				jmConfig,
+				jmActorSystem,
+				StreamingMode.BATCH_ONLY,
+				JobManager.class,
+				MemoryArchivist.class)._1();
 			// the TaskManager java command
 			String[] command = new String[] {
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<scope>compile</scope>
+		</dependency>
@@ -177,6 +182,140 @@ under the License.
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+public class FlinkYarnSessionCliTest {
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+	@Test
+	public void testDynamicProperties() throws IOException {
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		File tmpFolder = tmp.newFolder();
+		File fakeConf = new File(tmpFolder, "flink-conf.yaml");
+		fakeConf.createNewFile();
+		map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
+		TestBaseUtils.setEnv(map);
+		Options options = new Options();
+		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
+		cli.getYARNSessionCLIOptions(options);
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
+		} catch(Exception e) {
+			e.printStackTrace();
+"Parsing failed with " + e.getMessage());
+		}
+		AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd);
+		Assert.assertNotNull(flinkYarnClient);
+		List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
+		Assert.assertEquals(1, dynProperties.size());
+		Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0);
+		Assert.assertEquals("5 min", dynProperties.get(0).f1);
+	}
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+ * Yarn application master which starts the {@link TestingYarnJobManager} and the
+ * {@link TestingMemoryArchivist}.
+ */
+public class TestingApplicationMaster extends ApplicationMasterBase {
+	@Override
+	public Class<? extends JobManager> getJobManagerClass() {
+		return TestingYarnJobManager.class;
+	}
+	@Override
+	public Class<? extends MemoryArchivist> getArchivistClass() {
+		return TestingMemoryArchivist.class;
+	}
+	public static void main(String[] args) {
+		TestingApplicationMaster applicationMaster = new TestingApplicationMaster();
+	}
+import java.util.ArrayList;
+import java.util.List;
+ * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the
+ * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which
+ * are shipped to the yarn cluster. This is necessary to load the testing classes.
+ */
+public class TestingFlinkYarnClient extends FlinkYarnClientBase {
+	public TestingFlinkYarnClient() {
+		List<File> filesToShip = new ArrayList<>();
+		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
+		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
+			"Make sure to package the flink-yarn-tests module.");
+		File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
+		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
+			"jar. Make sure to package the flink-runtime module.");
+		filesToShip.add(testingJar);
+		filesToShip.add(testingRuntimeJar);
+		setShipFiles(filesToShip);
+	}
+	@Override
+	protected Class<?> getApplicationMasterClass() {
+		return TestingApplicationMaster.class;
+	}
+	public static class TestJarFinder implements FilenameFilter {
+		private final String jarName;
+		public TestJarFinder(final String jarName) {
+			this.jarName = jarName;
+		}
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
+				dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator);
+		}
+	}
+ * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
+ */
+public class TestingYarnTaskManagerRunner {
+	public static void main(String[] args) throws IOException {
+		YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class);
+	}
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Level;
 import org.apache.log4j.spi.LoggingEvent;
@@ -35,7 +37,7 @@ public class UtilsTest {
 	public void testUberjarLocator() {
-		File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
+		File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter());
 		dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
@@ -47,6 +49,54 @@ public class UtilsTest {
+	/**
+	 * Remove 15% of the heap, at least 384MB.
+	 *
+	 */
+	@Test
+	public void testHeapCutoff() {
+		Configuration conf = new Configuration();
+		conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
+		conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
+		// test different configuration
+		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf) );
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
+		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.5");
+		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+	@Test(expected = IllegalArgumentException.class)
+	public void illegalArgument() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1.1");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+	@Test(expected = IllegalArgumentException.class)
+	public void illegalArgumentNegative() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "-0.01");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+	@Test(expected = IllegalArgumentException.class)
+	public void tooMuchCutoff() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "6000");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
 	// --------------- Tools to test if a certain string has been logged with Log4j. -------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/
+import akka.testkit.JavaTestKit;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+public class YARNHighAvailabilityITCase extends YarnTestBase {
+	private static TestingServer zkServer;
+	private static ActorSystem actorSystem;
+	private static final int numberApplicationAttempts = 10;
+	@BeforeClass
+	public static void setup() {
+		actorSystem = AkkaUtils.createDefaultActorSystem();
+		try {
+			zkServer = new TestingServer();
+			zkServer.start();
+		} catch (Exception e) {
+			e.printStackTrace();
+"Could not start ZooKeeper testing cluster.");
+		}
+		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
+		yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
+		startYARNWithConfig(yarnConfiguration);
+	}
+	@AfterClass
+	public static void teardown() throws IOException {
+		if(zkServer != null) {
+			zkServer.stop();
+		}
+		JavaTestKit.shutdownActorSystem(actorSystem);
+		actorSystem = null;
+	}
+	/**
+	 * Tests that the application master can be killed multiple times and that the surviving
+	 * TaskManager succesfully reconnects to the newly started JobManager.
+	 * @throws Exception
+	 */
+	@Test
+	public void testMultipleAMKill() throws Exception {
+		final int numberKillingAttempts = numberApplicationAttempts - 1;
+		TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient();
+		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
+		flinkYarnClient.setTaskManagerCount(1);
+		flinkYarnClient.setJobManagerMemory(768);
+		flinkYarnClient.setTaskManagerMemory(1024);
+		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+		String confDirPath = System.getenv("FLINK_CONF_DIR");
+		flinkYarnClient.setConfigurationDirectory(confDirPath);
+		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
+			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts);
+		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+		AbstractFlinkYarnCluster yarnCluster = null;
+		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+		try {
+			yarnCluster = flinkYarnClient.deploy();
+			yarnCluster.connectToCluster();
+			final Configuration config = yarnCluster.getFlinkConfiguration();
+			new JavaTestKit(actorSystem) {{
+				for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
+					new Within(timeout) {
+						@Override
+						protected void run() {
+							try {
+								LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+								ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
+								gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+								expectMsgEquals(Messages.getAcknowledge());
+								gateway.tell(PoisonPill.getInstance());
+							} catch (Exception e) {
+								throw new AssertionError("Could not complete test.", e);
+							}
+						}
+					};
+				}
+				new Within(timeout) {
+					@Override
+					protected void run() {
+						try {
+							LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+							ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
+							gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+							expectMsgEquals(Messages.getAcknowledge());
+						} catch (Exception e) {
+							throw new AssertionError("Could not complete test.", e);
+						}
+					}
+				};
+			}};
+		} finally {
+			if (yarnCluster != null) {
+				yarnCluster.shutdown(false);
+			}
+		}
+	}
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 			Map<String, String> map = new HashMap<String, String>(System.getenv());
-			File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName(new String[] {"flink-conf.yaml"}));
-			Assert.assertNotNull(flinkConfFilePath);
-			map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent());
+			File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
+			Assert.assertNotNull(flinkConfDirPath);
+			map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent());
 			File yarnConfFile = writeYarnSiteConfigXML(conf);
 			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
 			map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
@@ -580,11 +583,7 @@ public abstract class YarnTestBase extends TestLogger {
 	// -------------------------- Tear down -------------------------- //
-	public static void tearDown() {
-		/*
-			We don't shut down the MiniCluster, as it is prone to blocking infinitely.
-		*/
+	public static void copyOnTravis() {
 		// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
 		// to <flinkRoot>/target/flink-yarn-tests-*.
 		// The files from there are picked up by the ./tools/ script
 # log whats going on between the tests, console, console
+package org.apache.flink.yarn
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+/** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin.
+  *
+  * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
+  * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
+  *
+  * @param flinkConfiguration Configuration object for the actor
+  * @param executionContext Execution context which is used to execute concurrent tasks in the
+  *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param instanceManager Instance manager to manage the registered
+  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
+  * @param scheduler Scheduler to schedule Flink jobs
+  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param archive Archive for finished Flink jobs
+  * @param defaultExecutionRetries Number of default execution retries
+  * @param delayBetweenRetries Delay between retries
+  * @param timeout Timeout for futures
+  * @param mode StreamingMode in which the system shall be started
+  * @param leaderElectionService LeaderElectionService to participate in the leader election
+  */
+class TestingYarnJobManager(
+    flinkConfiguration: Configuration,
+    executionContext: ExecutionContext,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    defaultExecutionRetries: Int,
+    delayBetweenRetries: Long,
+    timeout: FiniteDuration,
+    mode: StreamingMode,
+    leaderElectionService: LeaderElectionService)
+  extends YarnJobManager(
+    flinkConfiguration,
+    executionContext,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    defaultExecutionRetries,
+    delayBetweenRetries,
+    timeout,
+    mode,
+    leaderElectionService)
+  with TestingJobManagerLike {
+  override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
+/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.
+  *
+  * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
+  * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
+  *
+  * @param config Configuration object for the actor
+  * @param connectionInfo Connection information of this actor
+  * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation
+  * @param ioManager IOManager responsible for I/O
+  * @param network NetworkEnvironment for this actor
+  * @param numberOfSlots Number of slots for this TaskManager
+  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading
+  *                              JobManager
+  */
+class TestingYarnTaskManager(
+    config: TaskManagerConfiguration,
+    connectionInfo: InstanceConnectionInfo,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    leaderRetrievalService: LeaderRetrievalService)
+  extends YarnTaskManager(
+    config,
+    connectionInfo,
+    memoryManager,
+    ioManager,
+    network,
+    numberOfSlots,
+    leaderRetrievalService)
+  with TestingTaskManagerLike {}
-      }
-    case NotifyListeners =>
-      for(jobID <- currentJobs.keySet){
-        notifyListeners(jobID)
-      }
-      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
-        periodicCheck foreach { _.cancel() }
-        periodicCheck = None
-      }
-    case NotifyWhenJobRemoved(jobID) =>
-      val gateways =
-      val responses ={
-        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
-      }
-      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
-      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
-      import context.dispatcher
-      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender
-    case CheckIfJobRemoved(jobID) =>
-      if(currentJobs.contains(jobID)) {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID))
-        )(context.dispatcher, sender())
-      } else {
-        sender() ! decorateMessage(true)
-      }
-    case NotifyWhenTaskManagerTerminated(taskManager) =>
-      val waiting = waitForTaskManagerToBeTerminated.getOrElse(, Set())
-      waitForTaskManagerToBeTerminated += -> (waiting + sender)
-    case msg@Terminated(taskManager) =>
-      super.handleMessage(msg)
-      waitForTaskManagerToBeTerminated.remove( foreach {
-        _ foreach {
-          listener =>
-            listener ! decorateMessage(TaskManagerTerminated(taskManager))
-        }
-      }
-    case NotifyWhenAccumulatorChange(jobID) =>
-      val (updated, registered) = waitForAccumulatorUpdate.
-        getOrElse(jobID, (false, Set[ActorRef]()))
-      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
-      sender ! true
-    /**
-     * Notification from the task manager that changed accumulator are transferred on next
-     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
-     */
-    case AccumulatorsChanged(jobID: JobID) =>
-      waitForAccumulatorUpdate.get(jobID) match {
-        case Some((updated, registered)) =>
-          waitForAccumulatorUpdate.put(jobID, (true, registered))
-        case None =>
-      }
-    /**
-     * Disabled async processing of accumulator values and send accumulators to the listeners if
-     * we previously received an [[AccumulatorsChanged]] message.
-     */
-    case msg : Heartbeat =>
-      super.handleMessage(msg)
-      waitForAccumulatorUpdate foreach {
-        case (jobID, (updated, actors)) if updated =>
-          currentJobs.get(jobID) match {
-            case Some((graph, jobInfo)) =>
-              val flinkAccumulators = graph.getFlinkAccumulators
-              val userAccumulators = graph.aggregateUserAccumulators
-              actors foreach {
-                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
-              }
-            case None =>
-          }
-          waitForAccumulatorUpdate.put(jobID, (false, actors))
-        case _ =>
-      }
-    case RequestWorkingTaskManager(jobID) =>
-      currentJobs.get(jobID) match {
-        case Some((eg, _)) =>
-          if(eg.getAllExecutionVertices.asScala.isEmpty){
-            sender ! decorateMessage(WorkingTaskManager(None))
-          } else {
-            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
-            if(resource == null){
-              sender ! decorateMessage(WorkingTaskManager(None))
-            } else {
-              sender ! decorateMessage(
-                WorkingTaskManager(
-                  Some(resource.getInstance().getActorGateway)
-                )
-              )
-            }
-          }
-        case None => sender ! decorateMessage(WorkingTaskManager(None))
-      }
-    case NotifyWhenJobStatus(jobID, state) =>
-      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
-        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
-      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
-      jobStatusListener += state -> (listener + sender)
-    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
-      super.handleMessage(msg)
-      val cleanup = waitForJobStatus.get(jobID) match {
-        case Some(stateListener) =>
-          stateListener.remove(newJobStatus) match {
-            case Some(listeners) =>
-              listeners foreach {
-                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
-              }
-            case _ =>
-          }
-          stateListener.isEmpty
-        case _ => false
-      }
-      if (cleanup) {
-        waitForJobStatus.remove(jobID)
-      }
-    case DisableDisconnect =>
-      disconnectDisabled = true
-    case msg: Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-        val taskManager = sender
-        waitForTaskManagerToBeTerminated.remove( foreach {
-          _ foreach {
-            listener =>
-              listener ! decorateMessage(TaskManagerTerminated(taskManager))
-          }
-        }
-      }
-    case NotifyWhenLeader =>
-      if (leaderElectionService.hasLeadership) {
-        sender() ! true
-      } else {
-        waitForLeader += sender()
-      }
-    case msg: GrantLeadership =>
-      super.handleMessage(msg)
-      waitForLeader.foreach(_ ! true)
-      waitForLeader.clear()
-  }
-  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
-      case None => false
-    }
-  }
-  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
-    currentJobs.get(jobID) match {
-      case Some((eg, _)) =>
-        eg.getAllExecutionVertices.asScala.forall {
-          case vertex =>
-            (vertex.getExecutionState == ExecutionState.RUNNING
-              || vertex.getExecutionState == ExecutionState.FINISHED)
-        }
-      case None => false
-    }
-  }
-  def notifyListeners(jobID: JobID): Unit = {
-    if(checkIfAllVerticesRunning((jobID))) {
-      waitForAllVerticesToBeRunning.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-    if(checkIfAllVerticesRunningOrFinished(jobID)) {
-      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
-        case Some(listeners) =>
-          for (listener <- listeners) {
-            listener ! decorateMessage(AllVerticesRunning(jobID))
-          }
-        case _ =>
-      }
-    }
-  }
+    leaderElectionService)
+  with TestingJobManagerLike {}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
new file mode 100644
index 0000000..e91f068
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -0,0 +1,364 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.testingUtils
+import{Terminated, Cancellable, ActorRef}
+import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
+import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
+import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged
+import scala.collection.mutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import language.postfixOps
+/** This mixin can be used to decorate a JobManager with messages for testing purpose.  */
+trait TestingJobManagerLike extends FlinkActor {
+  that: JobManager =>
+  import scala.collection.JavaConverters._
+  import context._
+  val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+  val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForAllVerticesToBeRunningOrFinished =
+    scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
+  var periodicCheck: Option[Cancellable] = None
+  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+  val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]()
+  val waitForLeader = scala.collection.mutable.HashSet[ActorRef]()
+  val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder(
+    new Ordering[(Int, ActorRef)] {
+      override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1
+    })
+  var disconnectDisabled = false
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+    case RequestExecutionGraph(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((executionGraph, jobInfo)) => sender() ! decorateMessage(
+          ExecutionGraphFound(
+            jobID,
+            executionGraph)
+        )
+        case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender())
+      }
+    case WaitForAllVerticesToBeRunning(jobID) =>
+      if(checkIfAllVerticesRunning(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunning += jobID -> (waiting + sender())
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+    case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
+      if(checkIfAllVerticesRunningOrFinished(jobID)){
+        sender() ! decorateMessage(AllVerticesRunning(jobID))
+      }else{
+        val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]())
+        waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender())
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(
+              context.system.scheduler.schedule(
+                0 seconds,
+                200 millis,
+                self,
+                decorateMessage(NotifyListeners)
+              )
+            )
+        }
+      }
+    case NotifyListeners =>
+      for(jobID <- currentJobs.keySet){
+        notifyListeners(jobID)
+      }
+      if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) {
+        periodicCheck foreach { _.cancel() }
+        periodicCheck = None
+      }
+    case NotifyWhenJobRemoved(jobID) =>
+      val gateways =
+      val responses ={
+        gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean]
+      }
+      val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean]
+      val allFutures = responses ++ Seq(jobRemovedOnJobManager)
+      import context.dispatcher
+      Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender()
+    case CheckIfJobRemoved(jobID) =>
+      if(currentJobs.contains(jobID)) {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID))
+        )(context.dispatcher, sender())
+      } else {
+        sender() ! decorateMessage(true)
+      }
+    case NotifyWhenTaskManagerTerminated(taskManager) =>
+      val waiting = waitForTaskManagerToBeTerminated.getOrElse(, Set())
+      waitForTaskManagerToBeTerminated += -> (waiting + sender)
+    case msg@Terminated(taskManager) =>
+      super.handleMessage(msg)
+      waitForTaskManagerToBeTerminated.remove( foreach {
+        _ foreach {
+          listener =>
+            listener ! decorateMessage(TaskManagerTerminated(taskManager))
+        }
+      }
+    case NotifyWhenAccumulatorChange(jobID) =>
+      val (updated, registered) = waitForAccumulatorUpdate.
+        getOrElse(jobID, (false, Set[ActorRef]()))
+      waitForAccumulatorUpdate += jobID -> (updated, registered + sender)
+      sender ! true
+    /**
+     * Notification from the task manager that changed accumulator are transferred on next
+     * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report.
+     */
+    case AccumulatorsChanged(jobID: JobID) =>
+      waitForAccumulatorUpdate.get(jobID) match {
+        case Some((updated, registered)) =>
+          waitForAccumulatorUpdate.put(jobID, (true, registered))
+        case None =>
+      }
+    /**
+     * Disabled async processing of accumulator values and send accumulators to the listeners if
+     * we previously received an [[AccumulatorsChanged]] message.
+     */
+    case msg : Heartbeat =>
+      super.handleMessage(msg)
+      waitForAccumulatorUpdate foreach {
+        case (jobID, (updated, actors)) if updated =>
+          currentJobs.get(jobID) match {
+            case Some((graph, jobInfo)) =>
+              val flinkAccumulators = graph.getFlinkAccumulators
+              val userAccumulators = graph.aggregateUserAccumulators
+              actors foreach {
+                actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators)
+              }
+            case None =>
+          }
+          waitForAccumulatorUpdate.put(jobID, (false, actors))
+        case _ =>
+      }
+    case RequestWorkingTaskManager(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((eg, _)) =>
+          if(eg.getAllExecutionVertices.asScala.isEmpty){
+            sender ! decorateMessage(WorkingTaskManager(None))
+          } else {
+            val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource
+            if(resource == null){
+              sender ! decorateMessage(WorkingTaskManager(None))
+            } else {
+              sender ! decorateMessage(
+                WorkingTaskManager(
+                  Some(resource.getInstance().getActorGateway)
+                )
+              )
+            }
+          }
+        case None => sender ! decorateMessage(WorkingTaskManager(None))
+      }
+    case NotifyWhenJobStatus(jobID, state) =>
+      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
+        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
+      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
+      jobStatusListener += state -> (listener + sender)
+    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
+      super.handleMessage(msg)
+      val cleanup = waitForJobStatus.get(jobID) match {
+        case Some(stateListener) =>
+          stateListener.remove(newJobStatus) match {
+            case Some(listeners) =>
+              listeners foreach {
+                _ ! decorateMessage(JobStatusIs(jobID, newJobStatus))
+              }
+            case _ =>
+          }
+          stateListener.isEmpty
+        case _ => false
+      }
+      if (cleanup) {
+        waitForJobStatus.remove(jobID)
+      }
+    case DisableDisconnect =>
+      disconnectDisabled = true
+    case msg: Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+        val taskManager = sender()
+        waitForTaskManagerToBeTerminated.remove( foreach {
+          _ foreach {
+            listener =>
+              listener ! decorateMessage(TaskManagerTerminated(taskManager))
+          }
+        }
+      }
+    case NotifyWhenLeader =>
+      if (leaderElectionService.hasLeadership) {
+        sender() ! true
+      } else {
+        waitForLeader += sender()
+      }
+    case msg: GrantLeadership =>
+      super.handleMessage(msg)
+      waitForLeader.foreach(_ ! true)
+      waitForLeader.clear()
+    case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
+      if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) {
+        // there are already at least numRegisteredTaskManager registered --> send Acknowledge
+        sender() ! Acknowledge
+      } else {
+        // wait until we see at least numRegisteredTaskManager being registered at the JobManager
+        waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender()))
+      }
+    case msg:RegisterTaskManager =>
+      super.handleMessage(msg)
+      // dequeue all senders which wait for instanceManager.getNumberOfRegisteredTaskManagers or
+      // fewer registered TaskManagers
+      while (waitForNumRegisteredTaskManagers.nonEmpty &&
+        waitForNumRegisteredTaskManagers.head._1 <=
+          instanceManager.getNumberOfRegisteredTaskManagers) {
+        val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
+        receiver ! Acknowledge
+      }
+  }
+  def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING)
+      case None => false
+    }
+  }
+  def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = {
+    currentJobs.get(jobID) match {
+      case Some((eg, _)) =>
+        eg.getAllExecutionVertices.asScala.forall {
+          case vertex =>
+            (vertex.getExecutionState == ExecutionState.RUNNING
+              || vertex.getExecutionState == ExecutionState.FINISHED)
+        }
+      case None => false
+    }
+  }
+  def notifyListeners(jobID: JobID): Unit = {
+    if(checkIfAllVerticesRunning(jobID)) {
+      waitForAllVerticesToBeRunning.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+    if(checkIfAllVerticesRunningOrFinished(jobID)) {
+      waitForAllVerticesToBeRunningOrFinished.remove(jobID) match {
+        case Some(listeners) =>
+          for (listener <- listeners) {
+            listener ! decorateMessage(AllVerticesRunning(jobID))
+          }
+        case _ =>
+      }
+    }
+  }
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index acade53..4f5cf14 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -60,7 +60,8 @@ object TestingJobManagerMessages {
    * Registers a listener to receive a message when accumulators changed.
    * The change must be explicitly triggered by the TestingTaskManager which can receive an
-   * [[AccumulatorChanged]] message by a task that changed the accumulators. This message is then
+   * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
+   * message by a task that changed the accumulators. This message is then
    * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]]
    * message when the next Heartbeat occurs.
@@ -78,5 +79,13 @@ object TestingJobManagerMessages {
   case object NotifyWhenLeader
+  /**
+   * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]]
+   * message when at least numRegisteredTaskManager have registered at the JobManager.
+   *
+   * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified
+   */
+  case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int)
   def getNotifyWhenLeader: AnyRef = NotifyWhenLeader
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
index 167afbf..71a7e3e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala
@@ -18,7 +18,6 @@
 package org.apache.flink.runtime.testingUtils
-import org.apache.flink.runtime.FlinkActor
 import org.apache.flink.runtime.jobmanager.MemoryArchivist
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphNotFound, ExecutionGraphFound, RequestExecutionGraph}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 8746fbb..0b38c9c 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -65,182 +65,5 @@ class TestingTaskManager(
-    leaderRetrievalService) {
-  import scala.collection.JavaConverters._
-  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
-  val waitForRegisteredAtJobManager = scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
-  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
-  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
-  var disconnectDisabled = false
-  /**
-   * Handler for testing related messages
-   */
-  override def handleMessage: Receive = {
-    handleTestingMessage orElse super.handleMessage
-  }
-  def handleTestingMessage: Receive = {
-    case Alive => sender() ! Acknowledge
-    case NotifyWhenTaskIsRunning(executionID) => {
-      Option(runningTasks.get(executionID)) match {
-        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
-          sender ! decorateMessage(true)
-        case _ =>
-          val listeners = waitForRunning.getOrElse(executionID, Set())
-          waitForRunning += (executionID -> (listeners + sender))
-      }
-    }
-    case RequestRunningTasks =>
-      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
-    case NotifyWhenTaskRemoved(executionID) =>
-      Option(runningTasks.get(executionID)) match {
-        case Some(_) =>
-          val set = waitForRemoval.getOrElse(executionID, Set())
-          waitForRemoval += (executionID -> (set + sender))
-        case None =>
-          if(unregisteredTasks.contains(executionID)) {
-            sender ! decorateMessage(true)
-          } else {
-              val set = waitForRemoval.getOrElse(executionID, Set())
-              waitForRemoval += (executionID -> (set + sender))
-          }
-      }
-    case TaskInFinalState(executionID) =>
-      super.handleMessage(TaskInFinalState(executionID))
-      waitForRemoval.remove(executionID) match {
-        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
-        case None =>
-      }
-      unregisteredTasks += executionID
-    case RequestBroadcastVariablesWithReferences =>
-      sender ! decorateMessage(
-        ResponseBroadcastVariablesWithReferences(
-          bcVarManager.getNumberOfVariablesWithReferences)
-      )
-    case RequestNumActiveConnections =>
-      val numActive = if (network.isAssociated) {
-                        network.getConnectionManager.getNumberOfActiveConnections
-                      } else {
-                        0
-                      }
-      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
-    case NotifyWhenJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-          context.dispatcher,
-          sender
-          )
-      }else{
-        sender ! decorateMessage(true)
-      }
-    case CheckIfJobRemoved(jobID) =>
-      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
-        sender ! decorateMessage(true)
-      } else {
-        context.system.scheduler.scheduleOnce(
-          200 milliseconds,
-          self,
-          decorateMessage(CheckIfJobRemoved(jobID)))(
-          context.dispatcher,
-          sender
-          )
-      }
-    case NotifyWhenJobManagerTerminated(jobManager) =>
-      val waiting = waitForJobManagerToBeTerminated.getOrElse(, Set())
-      waitForJobManagerToBeTerminated += -> (waiting + sender)
-    /**
-     * Message from task manager that accumulator values changed and need to be reported immediately
-     * instead of lazily through the
-     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
-     * message to the job manager that it knows it should report to the listeners.
-     */
-    case msg: AccumulatorsChanged =>
-      currentJobManager match {
-        case Some(jobManager) =>
-          jobManager.forward(msg)
-          sendHeartbeatToJobManager()
-          sender ! true
-        case None =>
-      }
-    case msg@Terminated(jobManager) =>
-      super.handleMessage(msg)
-      waitForJobManagerToBeTerminated.remove( foreach {
-        _ foreach {
-          _ ! decorateMessage(JobManagerTerminated(jobManager))
-        }
-      }
-    case msg:Disconnect =>
-      if (!disconnectDisabled) {
-        super.handleMessage(msg)
-        val jobManager = sender()
-        waitForJobManagerToBeTerminated.remove( foreach {
-          _ foreach {
-            _ ! decorateMessage(JobManagerTerminated(jobManager))
-          }
-        }
-      }
-    case DisableDisconnect =>
-      disconnectDisabled = true
-    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
-      super.handleMessage(msg)
-      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
-        waitForRunning.get(taskExecutionState.getID) foreach {
-          _ foreach (_ ! decorateMessage(true))
-        }
-      }
-    case RequestLeaderSessionID =>
-      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
-    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
-      if(isConnected && jobManager == currentJobManager.get) {
-        sender() ! true
-      } else {
-        val list = waitForRegisteredAtJobManager.getOrElse(
-          jobManager,
-          Set[ActorRef]())
-        waitForRegisteredAtJobManager += jobManager -> (list + sender())
-      }
-    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
-      super.handleMessage(msg)
-      val jm = sender()
-      waitForRegisteredAtJobManager.remove(jm).foreach {
-        listeners => listeners.foreach{
-          listener =>
-            listener ! true
-        }
-      }
-  }
+    leaderRetrievalService)
+  with TestingTaskManagerLike {}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
new file mode 100644
index 0000000..0350675
--- /dev/null
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -0,0 +1,220 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.testingUtils
+import{Terminated, ActorRef}
+import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.execution.ExecutionState
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
+import org.apache.flink.runtime.messages.JobManagerMessages.{ResponseLeaderSessionID,
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
+import org.apache.flink.runtime.messages.TaskMessages.{UpdateTaskExecutionState, TaskInFinalState}
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
+import org.apache.flink.runtime.testingUtils.TestingMessages.{DisableDisconnect,
+CheckIfJobRemoved, Alive}
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._
+import scala.concurrent.duration._
+import language.postfixOps
+/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */
+trait TestingTaskManagerLike extends FlinkActor {
+  that: TaskManager =>
+  import scala.collection.JavaConverters._
+  val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]()
+  val waitForRegisteredAtJobManager = scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]()
+  val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]()
+  val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]()
+  var disconnectDisabled = false
+  /**
+   * Handler for testing related messages
+   */
+  abstract override def handleMessage: Receive = {
+    handleTestingMessage orElse super.handleMessage
+  }
+  def handleTestingMessage: Receive = {
+    case Alive => sender() ! Acknowledge
+    case NotifyWhenTaskIsRunning(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(task) if task.getExecutionState == ExecutionState.RUNNING =>
+          sender ! decorateMessage(true)
+        case _ =>
+          val listeners = waitForRunning.getOrElse(executionID, Set())
+          waitForRunning += (executionID -> (listeners + sender))
+      }
+    case RequestRunningTasks =>
+      sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap))
+    case NotifyWhenTaskRemoved(executionID) =>
+      Option(runningTasks.get(executionID)) match {
+        case Some(_) =>
+          val set = waitForRemoval.getOrElse(executionID, Set())
+          waitForRemoval += (executionID -> (set + sender))
+        case None =>
+          if(unregisteredTasks.contains(executionID)) {
+            sender ! decorateMessage(true)
+          } else {
+            val set = waitForRemoval.getOrElse(executionID, Set())
+            waitForRemoval += (executionID -> (set + sender))
+          }
+      }
+    case TaskInFinalState(executionID) =>
+      super.handleMessage(TaskInFinalState(executionID))
+      waitForRemoval.remove(executionID) match {
+        case Some(actors) => for(actor <- actors) actor ! decorateMessage(true)
+        case None =>
+      }
+      unregisteredTasks += executionID
+    case RequestBroadcastVariablesWithReferences =>
+      sender ! decorateMessage(
+        ResponseBroadcastVariablesWithReferences(
+          bcVarManager.getNumberOfVariablesWithReferences)
+      )
+    case RequestNumActiveConnections =>
+      val numActive = if (network.isAssociated) {
+        network.getConnectionManager.getNumberOfActiveConnections
+      } else {
+        0
+      }
+      sender ! decorateMessage(ResponseNumActiveConnections(numActive))
+    case NotifyWhenJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.exists(_.getJobID == jobID)){
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }else{
+        sender ! decorateMessage(true)
+      }
+    case CheckIfJobRemoved(jobID) =>
+      if(runningTasks.values.asScala.forall(_.getJobID != jobID)){
+        sender ! decorateMessage(true)
+      } else {
+        context.system.scheduler.scheduleOnce(
+          200 milliseconds,
+          self,
+          decorateMessage(CheckIfJobRemoved(jobID)))(
+            context.dispatcher,
+            sender()
+          )
+      }
+    case NotifyWhenJobManagerTerminated(jobManager) =>
+      val waiting = waitForJobManagerToBeTerminated.getOrElse(, Set())
+      waitForJobManagerToBeTerminated += -> (waiting + sender)
+    /**
+     * Message from task manager that accumulator values changed and need to be reported immediately
+     * instead of lazily through the
+     * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this
+     * message to the job manager that it knows it should report to the listeners.
+     */
+    case msg: AccumulatorsChanged =>
+      currentJobManager match {
+        case Some(jobManager) =>
+          jobManager.forward(msg)
+          sendHeartbeatToJobManager()
+          sender ! true
+        case None =>
+      }
+    case msg@Terminated(jobManager) =>
+      super.handleMessage(msg)
+      waitForJobManagerToBeTerminated.remove( foreach {
+        _ foreach {
+          _ ! decorateMessage(JobManagerTerminated(jobManager))
+        }
+      }
+    case msg:Disconnect =>
+      if (!disconnectDisabled) {
+        super.handleMessage(msg)
+        val jobManager = sender()
+        waitForJobManagerToBeTerminated.remove( foreach {
+          _ foreach {
+            _ ! decorateMessage(JobManagerTerminated(jobManager))
+          }
+        }
+      }
+    case DisableDisconnect =>
+      disconnectDisabled = true
+    case msg @ UpdateTaskExecutionState(taskExecutionState) =>
+      super.handleMessage(msg)
+      if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) {
+        waitForRunning.get(taskExecutionState.getID) foreach {
+          _ foreach (_ ! decorateMessage(true))
+        }
+      }
+    case RequestLeaderSessionID =>
+      sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
+    case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) =>
+      if(isConnected && jobManager == currentJobManager.get) {
+        sender() ! true
+      } else {
+        val list = waitForRegisteredAtJobManager.getOrElse(
+          jobManager,
+          Set[ActorRef]())
+        waitForRegisteredAtJobManager += jobManager -> (list + sender())
+      }
+    case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) =>
+      super.handleMessage(msg)
+      val jm = sender()
+      waitForRegisteredAtJobManager.remove(jm).foreach {
+        listeners => listeners.foreach{
+          listener =>
+            listener ! true
+        }
+      }
+  }
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 21939d6..553b686 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,7 +28,7 @@ import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.jobmanager.JobManager
+import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
 import org.apache.flink.runtime.{LogMessages, LeaderSessionMessageFilter, FlinkActor, StreamingMode}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.instance.{AkkaActorGateway, ActorGateway}
@@ -274,7 +274,9 @@ object TestingUtils {
-        StreamingMode.BATCH_ONLY)
+        StreamingMode.BATCH_ONLY,
+        classOf[JobManager],
+        classOf[MemoryArchivist])
     new AkkaActorGateway(actor, null)
diff --git a/flink-shaded-curator/pom.xml b/flink-shaded-curator/pom.xml
index 3e3894a..ac62cc8 100644
--- a/flink-shaded-curator/pom.xml
+++ b/flink-shaded-curator/pom.xml
@@ -41,45 +41,34 @@ under the License.
+		<!-- Use Flink's Guava version here to avoid too many guava versions in Flink -->
+		<dependency>
+			<groupId></groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
-			<!-- Relocate the Curator's Guava dependency into a different namespace and
-			put create our own apache curator dependency.
-			We can easily integrate curator's netty into the jar file.
-			-->
-						<id>shade-flink</id> <!-- override inherited execution id -->
+						<id>shade-flink</id>
-							<shadedArtifactAttached>false</shadedArtifactAttached>
-							<createDependencyReducedPom>true</createDependencyReducedPom>
-							<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-							<transformers>
-								<!-- The service transformer is needed to merge META-INF/services files -->
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-							</transformers>
-							<artifactSet>
-								<includes>
-									<include>org.apache.curator:*</include>
-									<include>*</include>
-								</includes>
+							<artifactSet combine.self="override">
+								<excludes>
+									<exclude>log4j</exclude>
+									<exclude>org.slf4j:slf4j-log4j12</exclude>
+								</excludes>
-							<relocations>
-								<relocation>
-									<pattern></pattern>
-									<shadedPattern></shadedPattern>
-								</relocation>
-							</relocations>

[2/4] flink git commit: [FLINK-2790] [yarn] [ha] Add high availability support for Yarn

Posted by
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/ b/flink-yarn/src/main/java/org/apache/flink/yarn/
new file mode 100644
index 0000000..da6532f
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/
@@ -0,0 +1,873 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+* All classes in this package contain code taken from
+* and
+* and
+* The Flink jar is uploaded to HDFS by this client.
+* The application master and all the TaskManager containers get the jar file downloaded
+* by YARN into their local fs.
+public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
+	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
+	/**
+	 * Constants,
+	 * all starting with ENV_ are used as environment variables to pass values from the Client
+	 * to the Application Master.
+	 */
+	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+	public final static String ENV_APP_ID = "_APP_ID";
+	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
+	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
+	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
+	public static final String ENV_SLOTS = "_SLOTS";
+	public static final String ENV_DETACHED = "_DETACHED";
+	public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
+	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
+	/**
+	 * Minimum memory requirements, checked by the Client.
+	 */
+	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
+	private static final int MIN_TM_MEMORY = 768;
+	private Configuration conf;
+	private YarnClient yarnClient;
+	private YarnClientApplication yarnApplication;
+	/**
+	 * Files (usually in a distributed file system) used for the YARN session of Flink.
+	 * Contains configuration files and jar files.
+	 */
+	private Path sessionFilesDir;
+	/**
+	 * If the user has specified a different number of slots, we store them here
+	 */
+	private int slots = -1;
+	private int jobManagerMemoryMb = 1024;
+	private int taskManagerMemoryMb = 1024;
+	private int taskManagerCount = 1;
+	private String yarnQueue = null;
+	private String configurationDirectory;
+	private Path flinkConfigurationPath;
+	private Path flinkLoggingConfigurationPath; // optional
+	private Path flinkJarPath;
+	private String dynamicPropertiesEncoded;
+	private List<File> shipFiles = new ArrayList<File>();
+	private org.apache.flink.configuration.Configuration flinkConfiguration;
+	private boolean detached;
+	private boolean streamingMode;
+	private String customName = null;
+	public FlinkYarnClientBase() {
+		conf = new YarnConfiguration();
+		if(this.yarnClient == null) {
+			// Create yarnClient
+			yarnClient = YarnClient.createYarnClient();
+			yarnClient.init(conf);
+			yarnClient.start();
+		}
+		// for unit tests only
+		if(System.getenv("IN_TESTS") != null) {
+			try {
+				conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+			} catch (Throwable t) {
+				throw new RuntimeException("Error",t);
+			}
+		}
+	}
+	protected abstract Class<?> getApplicationMasterClass();
+	@Override
+	public void setJobManagerMemory(int memoryMb) {
+		if(memoryMb < MIN_JM_MEMORY) {
+			throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
+				+ "of " + MIN_JM_MEMORY+ " MB");
+		}
+		this.jobManagerMemoryMb = memoryMb;
+	}
+	@Override
+	public void setTaskManagerMemory(int memoryMb) {
+		if(memoryMb < MIN_TM_MEMORY) {
+			throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
+				+ "of " + MIN_TM_MEMORY+ " MB");
+		}
+		this.taskManagerMemoryMb = memoryMb;
+	}
+	@Override
+	public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
+		this.flinkConfiguration = conf;
+	}
+	@Override
+	public void setTaskManagerSlots(int slots) {
+		if(slots <= 0) {
+			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
+		}
+		this.slots = slots;
+	}
+	@Override
+	public int getTaskManagerSlots() {
+		return this.slots;
+	}
+	@Override
+	public void setQueue(String queue) {
+		this.yarnQueue = queue;
+	}
+	@Override
+	public void setLocalJarPath(Path localJarPath) {
+		if(!localJarPath.toString().endsWith("jar")) {
+			throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
+		}
+		this.flinkJarPath = localJarPath;
+	}
+	@Override
+	public void setConfigurationFilePath(Path confPath) {
+		flinkConfigurationPath = confPath;
+	}
+	public void setConfigurationDirectory(String configurationDirectory) {
+		this.configurationDirectory = configurationDirectory;
+	}
+	@Override
+	public void setFlinkLoggingConfigurationPath(Path logConfPath) {
+		flinkLoggingConfigurationPath = logConfPath;
+	}
+	@Override
+	public Path getFlinkLoggingConfigurationPath() {
+		return flinkLoggingConfigurationPath;
+	}
+	@Override
+	public void setTaskManagerCount(int tmCount) {
+		if(tmCount < 1) {
+			throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
+		}
+		this.taskManagerCount = tmCount;
+	}
+	@Override
+	public int getTaskManagerCount() {
+		return this.taskManagerCount;
+	}
+	@Override
+	public void setShipFiles(List<File> shipFiles) {
+		for(File shipFile: shipFiles) {
+			// remove uberjar from ship list (by default everything in the lib/ folder is added to
+			// the list of files to ship, but we handle the uberjar separately.
+			if(!(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar"))) {
+				this.shipFiles.add(shipFile);
+			}
+		}
+	}
+	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
+		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
+	}
+	@Override
+	public String getDynamicPropertiesEncoded() {
+		return this.dynamicPropertiesEncoded;
+	}
+	public void isReadyForDeployment() throws YarnDeploymentException {
+		if(taskManagerCount <= 0) {
+			throw new YarnDeploymentException("Taskmanager count must be positive");
+		}
+		if(this.flinkJarPath == null) {
+			throw new YarnDeploymentException("The Flink jar path is null");
+		}
+		if(this.configurationDirectory == null) {
+			throw new YarnDeploymentException("Configuration directory not set");
+		}
+		if(this.flinkConfigurationPath == null) {
+			throw new YarnDeploymentException("Configuration path not set");
+		}
+		if(this.flinkConfiguration == null) {
+			throw new YarnDeploymentException("Flink configuration object has not been set");
+		}
+		// check if required Hadoop environment variables are set. If not, warn user
+		if(System.getenv("HADOOP_CONF_DIR") == null &&
+			System.getenv("YARN_CONF_DIR") == null) {
+			LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
+				"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
+				"configuration for accessing YARN.");
+		}
+	}
+	public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
+		for(int i = 0; i < nodeManagers.length; i++) {
+			if(nodeManagers[i] >= toAllocate) {
+				nodeManagers[i] -= toAllocate;
+				return true;
+			}
+		}
+		return false;
+	}
+	@Override
+	public void setDetachedMode(boolean detachedMode) {
+		this.detached = detachedMode;
+	}
+	@Override
+	public boolean isDetached() {
+		return detached;
+	}
+	public AbstractFlinkYarnCluster deploy() throws Exception {
+		UserGroupInformation.setConfiguration(conf);
+		UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+		if (UserGroupInformation.isSecurityEnabled()) {
+			if (!ugi.hasKerberosCredentials()) {
+				throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
+					"You may use kinit to authenticate and request a TGT from the Kerberos server.");
+			}
+			return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
+				@Override
+				public AbstractFlinkYarnCluster run() throws Exception {
+					return deployInternal();
+				}
+			});
+		} else {
+			return deployInternal();
+		}
+	}
+	/**
+	 * This method will block until the ApplicationMaster/JobManager have been
+	 * deployed on YARN.
+	 */
+	protected AbstractFlinkYarnCluster deployInternal() throws Exception {
+		isReadyForDeployment();
+"Using values:");
+"\tTaskManager count = {}", taskManagerCount);
+"\tJobManager memory = {}", jobManagerMemoryMb);
+"\tTaskManager memory = {}", taskManagerMemoryMb);
+		// Create application via yarnClient
+		yarnApplication = yarnClient.createApplication();
+		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+		// ------------------ Add dynamic properties to local flinkConfiguraton ------
+		List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
+		for (Tuple2<String, String> dynProperty : dynProperties) {
+			flinkConfiguration.setString(dynProperty.f0, dynProperty.f1);
+		}
+		// ------------------ Check if the specified queue exists --------------
+		try {
+			List<QueueInfo> queues = yarnClient.getAllQueues();
+			if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
+				boolean queueFound = false;
+				for (QueueInfo queue : queues) {
+					if (queue.getQueueName().equals(this.yarnQueue)) {
+						queueFound = true;
+						break;
+					}
+				}
+				if (!queueFound) {
+					String queueNames = "";
+					for (QueueInfo queue : queues) {
+						queueNames += queue.getQueueName() + ", ";
+					}
+					LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
+						"Available queues: " + queueNames);
+				}
+			} else {
+				LOG.debug("The YARN cluster does not have any queues configured");
+			}
+		} catch(Throwable e) {
+			LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("Error details", e);
+			}
+		}
+		// ------------------ Check if the YARN Cluster has the requested resources --------------
+		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
+		// all allocations below this value are automatically set to this value.
+		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+		if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
+			LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+				+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
+				"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
+				"you requested will start.");
+		}
+		// set the memory to minAllocationMB to do the next checks correctly
+		if(jobManagerMemoryMb < yarnMinAllocationMB) {
+			jobManagerMemoryMb =  yarnMinAllocationMB;
+		}
+		if(taskManagerMemoryMb < yarnMinAllocationMB) {
+			taskManagerMemoryMb =  yarnMinAllocationMB;
+		}
+		Resource maxRes = appResponse.getMaximumResourceCapability();
+		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
+		if(jobManagerMemoryMb > maxRes.getMemory() ) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
+				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
+		}
+		if(taskManagerMemoryMb > maxRes.getMemory() ) {
+			failSessionDuringDeployment();
+			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
+				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
+		}
+		final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
+			"connecting from the beginning because the resources are currently not available in the cluster. " +
+			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
+			"the resources become available.";
+		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
+		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
+		}
+		if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
+			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+		}
+		if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
+			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+		}
+		// ----------------- check if the requested containers fit into the cluster.
+		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
+		// first, allocate the jobManager somewhere.
+		if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
+				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
+				Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
+		}
+		// allocate TaskManagers
+		for(int i = 0; i < taskManagerCount; i++) {
+			if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+				LOG.warn("There is not enough memory available in the YARN cluster. " +
+					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
+					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
+					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + NOTE_RSC );
+			}
+		}
+		// ------------------ Prepare Application Master Container  ------------------------------
+		// respect custom JVM options in the YAML file
+		final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+		boolean hasLogback = new File(logbackFile).exists();
+		String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+		boolean hasLog4j = new File(log4jFile).exists();
+		if(hasLogback) {
+			shipFiles.add(new File(logbackFile));
+		}
+		if(hasLog4j) {
+			shipFiles.add(new File(log4jFile));
+		}
+		// Set up the container launch context for the application master
+		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+		String amCommand = "$JAVA_HOME/bin/java"
+			+ " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + "M " +javaOpts;
+		if(hasLogback || hasLog4j) {
+			amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-main.log\"";
+			if(hasLogback) {
+				amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
+			}
+			if(hasLog4j) {
+				amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
+			}
+		}
+		amCommand += " " + getApplicationMasterClass().getName() + " "
+			+ " 1>"
+			+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
+			+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
+		amContainer.setCommands(Collections.singletonList(amCommand));
+		LOG.debug("Application Master start command: " + amCommand);
+		// intialize HDFS
+		// Copy the application master jar to the filesystem
+		// Create a local resource to point to the destination jar path
+		final FileSystem fs = FileSystem.get(conf);
+		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
+		if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+			fs.getScheme().startsWith("file")) {
+			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+				+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+				+ "The Flink YARN client needs to store its files in a distributed file system");
+		}
+		// Set-up ApplicationSubmissionContext for the application
+		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
+		if (RecoveryMode.isHighAvailabilityModeActivated(flinkConfiguration)) {
+			// activate re-execution of failed applications
+			appContext.setMaxAppAttempts(
+				flinkConfiguration.getInteger(
+					YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
+			activateHighAvailabilitySupport(appContext);
+		} else {
+			// set number of application retries to 1 in the default case
+			appContext.setMaxAppAttempts(
+				flinkConfiguration.getInteger(
+					1));
+		}
+		final ApplicationId appId = appContext.getApplicationId();
+		// Setup jar for ApplicationMaster
+		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
+		Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
+		localResources.put("flink.jar", appMasterJar);
+		localResources.put("flink-conf.yaml", flinkConf);
+		// setup security tokens (code from apache storm)
+		final Path[] paths = new Path[2 + shipFiles.size()];
+		StringBuilder envShipFileList = new StringBuilder();
+		// upload ship files
+		for (int i = 0; i < shipFiles.size(); i++) {
+			File shipFile = shipFiles.get(i);
+			LocalResource shipResources = Records.newRecord(LocalResource.class);
+			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
+			paths[2 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
+				shipLocalPath, shipResources, fs.getHomeDirectory());
+			localResources.put(shipFile.getName(), shipResources);
+			envShipFileList.append(paths[2 + i]);
+			if(i+1 < shipFiles.size()) {
+				envShipFileList.append(',');
+			}
+		}
+		paths[0] = remotePathJar;
+		paths[1] = remotePathConf;
+		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+		fs.setPermission(sessionFilesDir, permission); // set permission for path.
+		Utils.setTokensFor(amContainer, paths, conf);
+		amContainer.setLocalResources(localResources);
+		fs.close();
+		// Setup CLASSPATH for ApplicationMaster
+		Map<String, String> appMasterEnv = new HashMap<String, String>();
+		Utils.setupEnv(conf, appMasterEnv);
+		// set configuration values
+		appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount));
+		appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
+		appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );
+		appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString());
+		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
+		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
+		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
+		appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots));
+		appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, String.valueOf(detached));
+		appMasterEnv.put(FlinkYarnClient.ENV_STREAMING_MODE, String.valueOf(streamingMode));
+		if(dynamicPropertiesEncoded != null) {
+			appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		}
+		amContainer.setEnvironment(appMasterEnv);
+		// Set up resource type requirements for ApplicationMaster
+		Resource capability = Records.newRecord(Resource.class);
+		capability.setMemory(jobManagerMemoryMb);
+		capability.setVirtualCores(1);
+		String name;
+		if(customName == null) {
+			name = "Flink session with " + taskManagerCount + " TaskManagers";
+			if(detached) {
+				name += " (detached)";
+			}
+		} else {
+			name = customName;
+		}
+		appContext.setApplicationName(name); // application name
+		appContext.setApplicationType("Apache Flink");
+		appContext.setAMContainerSpec(amContainer);
+		appContext.setResource(capability);
+		if(yarnQueue != null) {
+			appContext.setQueue(yarnQueue);
+		}
+"Submitting application master " + appId);
+		yarnClient.submitApplication(appContext);
+"Waiting for the cluster to be allocated");
+		int waittime = 0;
+		loop: while( true ) {
+			ApplicationReport report = yarnClient.getApplicationReport(appId);
+			YarnApplicationState appState = report.getYarnApplicationState();
+			switch(appState) {
+				case FAILED:
+				case FINISHED:
+				case KILLED:
+					throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+						+ appState + " during deployment. \n" +
+						"Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
+						"If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
+						"yarn logs -applicationId " + appId);
+					//break ..
+				case RUNNING:
+"YARN application has been deployed successfully.");
+					break loop;
+				default:
+"Deploying cluster, current state " + appState);
+					if(waittime > 60000) {
+"Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
+					}
+			}
+			waittime += 1000;
+			Thread.sleep(1000);
+		}
+		// the Flink cluster is deployed in YARN. Represent cluster
+		return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir, detached);
+	}
+	/**
+	 * Kills YARN application and stops YARN client.
+	 *
+	 * Use this method to kill the App before it has been properly deployed
+	 */
+	private void failSessionDuringDeployment() {
+"Killing YARN application");
+		try {
+			yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
+		} catch (Exception e) {
+			// we only log a debug message here because the "killApplication" call is a best-effort
+			// call (we don't know if the application has been deployed when the error occured).
+			LOG.debug("Error while killing YARN application", e);
+		}
+		yarnClient.stop();
+	}
+	private static class ClusterResourceDescription {
+		final public int totalFreeMemory;
+		final public int containerLimit;
+		final public int[] nodeManagersFree;
+		public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
+			this.totalFreeMemory = totalFreeMemory;
+			this.containerLimit = containerLimit;
+			this.nodeManagersFree = nodeManagersFree;
+		}
+	}
+	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+		int totalFreeMemory = 0;
+		int containerLimit = 0;
+		int[] nodeManagersFree = new int[nodes.size()];
+		for(int i = 0; i < nodes.size(); i++) {
+			NodeReport rep = nodes.get(i);
+			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+			nodeManagersFree[i] = free;
+			totalFreeMemory += free;
+			if(free > containerLimit) {
+				containerLimit = free;
+			}
+		}
+		return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
+	}
+	public String getClusterDescription() throws Exception {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		PrintStream ps = new PrintStream(baos);
+		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
+		ps.append("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+		final String format = "|%-16s |%-16s %n";
+		ps.printf("|Property         |Value          %n");
+		ps.println("+---------------------------------------+");
+		int totalMemory = 0;
+		int totalCores = 0;
+		for(NodeReport rep : nodes) {
+			final Resource res = rep.getCapability();
+			totalMemory += res.getMemory();
+			totalCores += res.getVirtualCores();
+			ps.format(format, "NodeID", rep.getNodeId());
+			ps.format(format, "Memory", res.getMemory() + " MB");
+			ps.format(format, "vCores", res.getVirtualCores());
+			ps.format(format, "HealthReport", rep.getHealthReport());
+			ps.format(format, "Containers", rep.getNumContainers());
+			ps.println("+---------------------------------------+");
+		}
+		ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
+		List<QueueInfo> qInfo = yarnClient.getAllQueues();
+		for(QueueInfo q : qInfo) {
+			ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
+				q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
+		}
+		yarnClient.stop();
+		return baos.toString();
+	}
+	public String getSessionFilesDir() {
+		return sessionFilesDir.toString();
+	}
+	@Override
+	public void setStreamingMode(boolean streamingMode) {
+		this.streamingMode = streamingMode;
+	}
+	@Override
+	public void setName(String name) {
+		if(name == null) {
+			throw new IllegalArgumentException("The passed name is null");
+		}
+		customName = name;
+	}
+	private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
+		ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
+		reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
+		reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis());
+	}
+	/**
+	 * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
+	 * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval
+	 * methods. Depending on the Hadoop version these methods are supported or not. If the methods
+	 * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or
+	 * setAttemptFailuresValidityInterval are called.
+	 */
+	private static class ApplicationSubmissionContextReflector {
+		private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
+		private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+		public static ApplicationSubmissionContextReflector getInstance() {
+			return instance;
+		}
+		private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
+		private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
+		private final Method keepContainersMethod;
+		private final Method attemptFailuresValidityIntervalMethod;
+		private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
+			Method keepContainersMethod;
+			Method attemptFailuresValidityIntervalMethod;
+			try {
+				// this method is only supported by Hadoop 2.4.0 onwards
+				keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+			} catch (NoSuchMethodException e) {
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+				// assign null because the Hadoop version apparently does not support this call.
+				keepContainersMethod = null;
+			}
+			this.keepContainersMethod = keepContainersMethod;
+			try {
+				// this method is only supported by Hadoop 2.6.0 onwards
+				attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+			} catch (NoSuchMethodException e) {
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+				// assign null because the Hadoop version apparently does not support this call.
+				attemptFailuresValidityIntervalMethod = null;
+			}
+			this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
+		}
+		public void setKeepContainersAcrossApplicationAttempts(
+				ApplicationSubmissionContext appContext,
+				boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+			if (keepContainersMethod != null) {
+				LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+					appContext.getClass().getCanonicalName());
+				keepContainersMethod.invoke(appContext, keepContainers);
+			} else {
+				LOG.debug("{} does not support method {}. Doing nothing.",
+					appContext.getClass().getCanonicalName(), keepContainersMethodName);
+			}
+		}
+		public void setAttemptFailuresValidityInterval(
+				ApplicationSubmissionContext appContext,
+				long validityInterval) throws InvocationTargetException, IllegalAccessException {
+			if (attemptFailuresValidityIntervalMethod != null) {
+				LOG.debug("Calling method {} of {}.",
+					attemptFailuresValidityIntervalMethod.getName(),
+					appContext.getClass().getCanonicalName());
+				attemptFailuresValidityIntervalMethod.invoke(appContext, validityInterval);
+			} else {
+				LOG.debug("{} does not support method {}. Doing nothing.",
+					appContext.getClass().getCanonicalName(),
+					attemptsFailuresValidityIntervalMethodName);
+			}
+		}
+	}
+	public static class YarnDeploymentException extends RuntimeException {
+		private static final long serialVersionUID = -812040641215388943L;
+		public YarnDeploymentException() {
+		}
+		public YarnDeploymentException(String message) {
+			super(message);
+		}
+		public YarnDeploymentException(String message, Throwable cause) {
+			super(message, cause);
+		}
+	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/ b/flink-yarn/src/main/java/org/apache/flink/yarn/
index a1a5205..989ae0f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/
@@ -22,13 +22,17 @@ import;
 import static akka.pattern.Patterns.ask;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.hadoop.conf.Configuration;
@@ -147,19 +151,32 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		actorSystem = AkkaUtils.createActorSystem(flinkConfig,
 				new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0)));
-		// start application client
-"Start application client.");
+		// Create the leader election service
+		flinkConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerAddress.getHostName());
+		flinkConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerAddress.getPort());
-		applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), "applicationClient");
+		LeaderRetrievalService leaderRetrievalService;
+		try {
+			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+		} catch (Exception e) {
+			throw new IOException("Could not create the leader retrieval service.", e);
+		}
-		// instruct ApplicationClient to start a periodical status polling
-		applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
+		// start application client
+"Start application client.");
+		applicationClient = actorSystem.actorOf(
+			Props.create(
+				ApplicationClient.class,
+				flinkConfig,
+				leaderRetrievalService),
+			"applicationClient");
 		actorRunner = new Thread(new Runnable() {
 			public void run() {
-				// blocks until ApplicationMaster has been stopped
+				// blocks until ApplicationClient has been stopped
 				// get final application report
@@ -206,7 +223,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 			LOG.warn("Error while removing the shutdown hook. The YARN session might be killed unintentionally");
 		// tell the actor to shut down.
-		applicationClient.tell(Messages.getLocalUnregisterClient(), applicationClient);
+		applicationClient.tell(PoisonPill.getInstance(), applicationClient);
 		try {
 			actorRunner.join(1000); // wait for 1 second
@@ -233,7 +250,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	public void stopAfterJob(JobID jobID) {
 		Preconditions.checkNotNull("The job id must not be null", jobID);
-		Future<Object> messageReceived = ask(applicationClient, new Messages.StopAMAfterJob(jobID), akkaTimeout);
+		Future<Object> messageReceived = ask(applicationClient, new YarnMessages.LocalStopAMAfterJob(jobID), akkaTimeout);
 		try {
 			Await.result(messageReceived, akkaDuration);
 		} catch (Exception e) {
@@ -282,7 +299,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 		if(hasBeenStopped()) {
 			throw new RuntimeException("The FlinkYarnCluster has already been stopped");
-		Future<Object> clusterStatusOption = ask(applicationClient, Messages.LocalGetYarnClusterStatus$.MODULE$, akkaTimeout);
+		Future<Object> clusterStatusOption = ask(applicationClient, YarnMessages.getLocalGetyarnClusterStatus(), akkaTimeout);
 		Object clusterStatus;
 		try {
 			clusterStatus = Await.result(clusterStatusOption, akkaDuration);
@@ -359,7 +376,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 			Object result = null;
 			try {
 				Future<Object> response = Patterns.ask(applicationClient,
-						Messages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
+						YarnMessages.getLocalGetYarnMessage(), new Timeout(akkaDuration));
 				result = Await.result(response, akkaDuration);
 			} catch(Exception ioe) {
@@ -378,8 +395,8 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 				} else {
 					Object obj = messageOption.get();
-					if(obj instanceof Messages.YarnMessage) {
-						Messages.YarnMessage msg = (Messages.YarnMessage) obj;
+					if(obj instanceof YarnMessages.YarnMessage) {
+						YarnMessages.YarnMessage msg = (YarnMessages.YarnMessage) obj;
 						ret.add("[" + + "] " + msg.message());
 					} else {
 						LOG.warn("LocalGetYarnMessage returned unexpected type: " + messageOption);
@@ -425,7 +442,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 						finalStatus = FinalApplicationStatus.SUCCEEDED;
 					Future<Object> response = Patterns.ask(applicationClient,
-							new Messages.StopYarnSession(finalStatus,
+							new YarnMessages.LocalStopYarnSession(finalStatus,
 									"Flink YARN Client requested shutdown"),
 							new Timeout(akkaDuration));
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/ b/flink-yarn/src/main/java/org/apache/flink/yarn/
new file mode 100644
index 0000000..d05b658
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/
@@ -0,0 +1,113 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+import java.util.Map;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.yarn.YarnTaskManager;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ * The entry point for running a TaskManager in a YARN container. The YARN container will invoke
+ * this class' main method.
+ */
+public class YarnTaskManagerRunner {
+	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class);
+	public static <T extends YarnTaskManager> void runYarnTaskManager(String[] args, final Class<T> taskManager) throws IOException {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
+		EnvironmentInformation.checkJavaVersion();
+		org.apache.flink.runtime.util.SignalHandler.register(LOG);
+		// try to parse the command line arguments
+		final Configuration configuration;
+		final StreamingMode mode;
+		try {
+			scala.Tuple2<Configuration, StreamingMode> res = TaskManager.parseArgsAndLoadConfig(args);
+			configuration = res._1();
+			mode = res._2();
+		}
+		catch (Throwable t) {
+			LOG.error(t.getMessage(), t);
+			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+			return;
+		}
+		// read the environment variables for YARN
+		final Map<String, String> envs = System.getenv();
+		final String yarnClientUsername = envs.get(FlinkYarnClient.ENV_CLIENT_USERNAME);
+		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+		// configure local directory
+		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
+		if (flinkTempDirs == null) {
+"Setting directories for temporary file " + localDirs);
+			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs);
+		}
+		else {
+"Overriding YARN's temporary file directories with those " +
+				"specified in the Flink config: " + flinkTempDirs);
+		}
+"YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() +
+			"' setting user to execute Flink TaskManager to '" + yarnClientUsername + "'");
+		// tell akka to die in case of an error
+		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration,
+						mode, taskManager);
+				}
+				catch (Throwable t) {
+					LOG.error("Error while starting the TaskManager", t);
+					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
+				}
+				return null;
+			}
+		});
+	}
+	public static void main(final String[] args) throws IOException {
+		runYarnTaskManager(args, YarnTaskManager.class);
+	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/
deleted file mode 100644
index 1a13f93..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/
+++ /dev/null
@@ -1,110 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.yarn.appMaster;
-import java.util.Map;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.yarn.YarnTaskManager;
-import org.apache.flink.yarn.FlinkYarnClient;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * The entry point for running a TaskManager in a YARN container. The YARN container will invoke
- * this class' main method.
- */
-public class YarnTaskManagerRunner {
-	private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class);
-	public static void main(final String[] args) throws IOException {
-		EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskManager", args);
-		EnvironmentInformation.checkJavaVersion();
-		org.apache.flink.runtime.util.SignalHandler.register(LOG);
-		// try to parse the command line arguments
-		final Configuration configuration;
-		final StreamingMode mode;
-		try {
-			scala.Tuple2<Configuration, StreamingMode> res = TaskManager.parseArgsAndLoadConfig(args);
-			configuration = res._1();
-			mode = res._2();
-		}
-		catch (Throwable t) {
-			LOG.error(t.getMessage(), t);
-			System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-			return;
-		}
-		// read the environment variables for YARN
-		final Map<String, String> envs = System.getenv();
-		final String yarnClientUsername = envs.get(FlinkYarnClient.ENV_CLIENT_USERNAME);
-		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-		// configure local directory
-		String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
-		if (flinkTempDirs == null) {
-"Setting directories for temporary file " + localDirs);
-			configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, localDirs);
-		}
-		else {
-"Overriding YARN's temporary file directories with those " +
-					"specified in the Flink config: " + flinkTempDirs);
-		}
-"YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() +
-				"' setting user to execute Flink TaskManager to '" + yarnClientUsername + "'");
-		// tell akka to die in case of an error
-		configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration,
-																		mode, YarnTaskManager.class);
-				}
-				catch (Throwable t) {
-					LOG.error("Error while starting the TaskManager", t);
-					System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-				}
-				return null;
-			}
-		});
-	}
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 5ae814f..79717ef 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -18,28 +18,36 @@
 package org.apache.flink.yarn
+import java.util.UUID
-import akka.pattern.ask
 import grizzled.slf4j.Logger
-import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
-import org.apache.flink.runtime.util.LeaderRetrievalUtils
-import org.apache.flink.runtime.util.LeaderRetrievalUtils.LeaderGatewayListener
-import org.apache.flink.runtime.{FlinkActor, LogMessages}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
+import org.apache.flink.runtime.{LeaderSessionMessageFilter, FlinkActor, LogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus
-import org.apache.flink.yarn.Messages._
+import org.apache.flink.yarn.YarnMessages._
 import scala.collection.mutable
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.util.{Failure, Success}
-class ApplicationClient(flinkConfig: Configuration)
-  extends FlinkActor with LogMessages {
+/** Actor which is responsible to repeatedly poll the Yarn cluster status from the JobManager.
+  *
+  * This class represents the bridge between the [[FlinkYarnCluster]] and the [[YarnJobManager]].
+  *
+  * @param flinkConfig Configuration object
+  * @param leaderRetrievalService [[LeaderRetrievalService]] which is used to retrieve the current
+  *                              leading [[YarnJobManager]]
+  */
+class ApplicationClient(
+    val flinkConfig: Configuration,
+    val leaderRetrievalService: LeaderRetrievalService)
+  extends FlinkActor
+  with LeaderSessionMessageFilter
+  with LogMessages
+  with LeaderRetrievalListener{
   import context._
   val log = Logger(getClass)
@@ -50,25 +58,36 @@ class ApplicationClient(flinkConfig: Configuration)
   var yarnJobManager: Option[ActorRef] = None
   var pollingTimer: Option[Cancellable] = None
-  implicit var timeout: FiniteDuration = 0 seconds
+  implicit val timeout: FiniteDuration = AkkaUtils.getTimeout(flinkConfig)
   var running = false
   var messagesQueue : mutable.Queue[YarnMessage] = mutable.Queue[YarnMessage]()
   var latestClusterStatus : Option[FlinkYarnClusterStatus] = None
   var stopMessageReceiver : Option[ActorRef] = None
+  var leaderSessionID: Option[UUID] = None
   override def preStart(): Unit = {
-    timeout = AkkaUtils.getTimeout(flinkConfig)
+    try {
+      leaderRetrievalService.start(this)
+    } catch {
+      case e: Exception =>
+        log.error("Could not start the leader retrieval service.", e)
+        throw new RuntimeException("Could not start the leader retrieval service.", e)
+    }
   override def postStop(): Unit = {"Stopped Application client.")
-    pollingTimer foreach {
-      _.cancel()
-    }
-    pollingTimer = None
+    disconnectFromJobManager()
+    try {
+      leaderRetrievalService.stop()
+    } catch {
+      case e: Exception => log.error("Leader retrieval service did not shout down properly.")
+    }
     // Terminate the whole actor system because there is only the application client running
@@ -76,39 +95,50 @@ class ApplicationClient(flinkConfig: Configuration)
   override def handleMessage: Receive = {
     // ----------------------------- Registration -> Status updates -> shutdown ----------------
-    case LocalRegisterClient(address: InetSocketAddress) =>
-      flinkConfig.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
-      flinkConfig.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
-      val leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig)
-      val listener = new LeaderGatewayListener(context.system, timeout);
-      leaderRetrievalService.start(listener)
-      val jobManagerGatewayFuture = listener.getActorGatewayFuture
-      jobManagerGatewayFuture.onComplete {
-        case Success(gateway) => self ! decorateMessage(JobManagerActorRef(
-        case Failure(t) =>
-          log.error("Registration at JobManager/ApplicationMaster failed. Shutting " +
-            "ApplicationClient down.", t)
+    case TriggerApplicationClientRegistration(jobManagerAkkaURL, currentTimeout, deadline) =>
+      if (isConnected) {
+        // we are already connected to the job manager
+        log.debug("ApplicationClient is already registered to the " +
+          s"JobManager ${yarnJobManager.get}.")
+      } else {
+        if (deadline.forall(_.isOverdue())) {
+          // we failed to register in time. That means we should quit
+          log.error(s"Failed to register at the JobManager with address $jobManagerAkkaURL. " +
+            s"Shutting down...")
-          // we could not connect to the job manager --> poison ourselves
           self ! decorateMessage(PoisonPill)
+        } else {
+"Trying to register at JobManager $jobManagerAkkaURL.")
+          val jobManager = context.actorSelection(jobManagerAkkaURL)
+          jobManager ! decorateMessage(
+            RegisterApplicationClient
+          )
+          val nextTimeout = (currentTimeout * 2).min(ApplicationClient.MAX_REGISTRATION_TIMEOUT)
+          context.system.scheduler.scheduleOnce(
+            currentTimeout,
+            self,
+            decorateMessage(
+              TriggerApplicationClientRegistration(
+                jobManagerAkkaURL,
+                nextTimeout,
+                deadline
+              )
+            )
+          )(context.dispatcher)
+        }
-    case JobManagerActorRef(jm) =>
-      yarnJobManager = Some(jm)
+    case AcknowledgeApplicationClientRegistration =>
+      val jm = sender()
-      // the message came from the FlinkYarnCluster. We send the message to the JobManager.
-      // it is important not to forward the message because the JobManager is storing the
-      // sender as the Application Client (this class).
-      (jm ? decorateMessage(RegisterClient(self)))(timeout).onFailure{
-        case t: Throwable =>
-          log.error("Could not register at the job manager.", t)
-          self ! decorateMessage(PoisonPill)
-      }
+"Successfully registered at the JobManager $jm")
+      yarnJobManager = Some(jm)
       // schedule a periodic status report from the JobManager
       // request the number of task managers and slots from the job manager
@@ -116,23 +146,42 @@ class ApplicationClient(flinkConfig: Configuration)
-          jm,
-          PollYarnClusterStatus)
+          yarnJobManager.get,
+          decorateMessage(PollYarnClusterStatus))
-    case LocalUnregisterClient =>
-      // unregister client from AM
-      yarnJobManager foreach {
-        _ ! decorateMessage(UnregisterClient)
+    case JobManagerLeaderAddress(jobManagerAkkaURL, newLeaderSessionID) =>
+"Received address of new leader $jobManagerAkkaURL with session ID" +
+        s" $newLeaderSessionID.")
+      disconnectFromJobManager()
+      leaderSessionID = Option(newLeaderSessionID)
+      Option(jobManagerAkkaURL).foreach{
+        akkaURL =>
+          if (akkaURL.nonEmpty) {
+            val maxRegistrationDuration = ApplicationClient.MAX_REGISTRATION_DURATION
+            val deadline = if (maxRegistrationDuration.isFinite()) {
+              Some(maxRegistrationDuration.fromNow)
+            } else {
+              None
+            }
+            // trigger registration at new leader
+            self ! decorateMessage(
+              TriggerApplicationClientRegistration(
+                akkaURL,
+                ApplicationClient.INITIAL_REGISTRATION_TIMEOUT,
+                deadline))
+          }
-      // poison ourselves
-      self ! decorateMessage(PoisonPill)
-    case msg: StopYarnSession =>
+    case LocalStopYarnSession(status, diagnostics) =>"Sending StopYarnSession request to ApplicationMaster.")
-      stopMessageReceiver = Some(sender)
+      stopMessageReceiver = Some(sender())
       yarnJobManager foreach {
-        _ forward decorateMessage(msg)
+        _ ! decorateMessage(StopYarnSession(status, diagnostics))
     case JobManagerStopped =>
@@ -154,9 +203,9 @@ class ApplicationClient(flinkConfig: Configuration)
       sender() ! decorateMessage(latestClusterStatus)
     // Forward message to Application Master
-    case msg: StopAMAfterJob =>
+    case LocalStopAMAfterJob(jobID) =>
       yarnJobManager foreach {
-        _ forward decorateMessage(msg)
+        _ forward decorateMessage(StopAMAfterJob(jobID))
     // -----------------  handle messages from the cluster -------------------
@@ -167,13 +216,41 @@ class ApplicationClient(flinkConfig: Configuration)
     // locally forward messages
     case LocalGetYarnMessage =>
-      if(messagesQueue.size > 0) {
-        sender() ! decorateMessage(Option(messagesQueue.dequeue))
+      if(messagesQueue.nonEmpty) {
+        sender() ! decorateMessage(Option(messagesQueue.dequeue()))
       } else {
         sender() ! decorateMessage(None)
+  /** Disconnects this [[ApplicationClient]] from the connected [[YarnJobManager]] and cancels
+    * the polling timer.
+    *
+    */
+  def disconnectFromJobManager(): Unit = {
+"Disconnect from JobManager ${yarnJobManager.getOrElse(ActorRef.noSender)}.")
+    yarnJobManager foreach {
+      _ ! decorateMessage(UnregisterClient)
+    }
+    pollingTimer foreach {
+      _.cancel()
+    }
+    yarnJobManager = None
+    leaderSessionID = None
+    pollingTimer = None
+  }
+  /** True if the [[ApplicationClient]] is connected to the [[YarnJobManager]]
+    *
+    * @return true if the client is connected to the JobManager, otherwise false
+    */
+  def isConnected: Boolean = {
+    yarnJobManager.isDefined
+  }
    * Handle unmatched messages with an exception.
@@ -181,4 +258,23 @@ class ApplicationClient(flinkConfig: Configuration)
     // let the actor crash
     throw new RuntimeException("Received unknown message " + message)
+  override def notifyLeaderAddress(leaderAddress: String, leaderSessionID: UUID): Unit = {
+"Notification about new leader address $leaderAddress with " +
+      s"session ID $leaderSessionID.")
+    self ! JobManagerLeaderAddress(leaderAddress, leaderSessionID)
+  }
+  override def handleError(exception: Exception): Unit = {
+    log.error("Error in leader retrieval service.", exception)
+    // in case of an error in the LeaderRetrievalService, we shut down the ApplicationClient
+    self ! decorateMessage(PoisonPill)
+  }
+object ApplicationClient {
+  val INITIAL_REGISTRATION_TIMEOUT: FiniteDuration = 500 milliseconds
+  val MAX_REGISTRATION_DURATION: FiniteDuration = 5 minutes
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 869c643..ad162fe 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -17,293 +17,21 @@
 package org.apache.flink.yarn
-import{PrintWriter, FileWriter, BufferedWriter}
-import grizzled.slf4j.Logger
-import org.apache.flink.client.CliFrontend
-import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
-import org.apache.flink.runtime.StreamingMode
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobmanager.JobManager
-import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.util.{StandaloneUtils, EnvironmentInformation}
-import org.apache.flink.runtime.webmonitor.WebMonitor
-import org.apache.flink.yarn.Messages.StartYarnSession
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import scala.collection.JavaConversions._
+import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManager}
+/** Default implemenation of the [[ApplicationMasterBase]] which starts a [[YarnJobManager]] and a
+  * [[MemoryArchivist]].
+  */
+class ApplicationMaster extends ApplicationMasterBase {
+  override def getJobManagerClass: Class[_ <: JobManager] = classOf[YarnJobManager]
+  override def getArchivistClass: Class[_ <: MemoryArchivist] = classOf[MemoryArchivist]
 object ApplicationMaster {
-  val LOG = Logger(getClass)
-  val CONF_FILE = "flink-conf.yaml"
-  val MODIFIED_CONF_FILE = "flink-conf-modified.yaml"
-  val MAX_REGISTRATION_DURATION = "5 minutes"
   def main(args: Array[String]): Unit = {
-    val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
-"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName} " +
-      s"setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}")
-    EnvironmentInformation.logEnvironmentInfo(LOG.logger, "YARN ApplicationMaster/JobManager", args)
-    EnvironmentInformation.checkJavaVersion()
-    org.apache.flink.runtime.util.SignalHandler.register(LOG.logger)
-    var streamingMode = StreamingMode.BATCH_ONLY
-    val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
-    for(token <- UserGroupInformation.getCurrentUser.getTokens){
-      ugi.addToken(token)
-    }
-    ugi.doAs(new PrivilegedAction[Object] {
-      override def run(): Object = {
-        var actorSystem: ActorSystem = null
-        var webserver: WebMonitor = null
-        try {
-          val conf = new YarnConfiguration()
-          val env = System.getenv()
-          if (LOG.isDebugEnabled) {
-            LOG.debug("All environment variables: " + env.toString)
-          }
-          val currDir = env.get(Environment.PWD.key())
-          require(currDir != null, "Current directory unknown.")
-          val logDirs = env.get(Environment.LOG_DIRS.key())
-          if(hasStreamingMode(env)) {
-  "Starting ApplicationMaster/JobManager in streaming mode")
-            streamingMode = StreamingMode.STREAMING
-          }
-          // Note that we use the "ownHostname" given by YARN here, to make sure
-          // we use the hostnames given by YARN consistently throughout akka.
-          // for akka "localhost" and "localhost.localdomain" are different actors.
-          val ownHostname = env.get(Environment.NM_HOST.key())
-          require(ownHostname != null, "Own hostname in YARN not set.")
-          val taskManagerCount = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
-          val slots = env.get(FlinkYarnClient.ENV_SLOTS).toInt
-          val dynamicPropertiesEncodedString = env.get(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES)
-          val config = createConfiguration(currDir, dynamicPropertiesEncodedString)
-          val (
-            system: ActorSystem,
-            jobManager: ActorRef,
-            archiver: ActorRef) = startJobManager(
-              config,
-              ownHostname,
-              streamingMode)
-          actorSystem = system
-          val address = AkkaUtils.getAddress(actorSystem)
-          val jobManagerPort = address.port.get
-          if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
-            // start the web info server
-  "Starting Job Manger web frontend.")
-            config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
-            config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
-            // set JobManager host/port for web interface.
-            config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, ownHostname)
-            config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort)
-            // TODO: Add support for HA: Make web server work independently from the JM
-            val leaderRetrievalService = StandaloneUtils.createLeaderRetrievalService(config)
-            webserver = if(
-              config.getBoolean(
-                ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY,
-                false)) {
-              JobManager.startWebRuntimeMonitor(config, leaderRetrievalService, actorSystem)
-            } else {
-              new WebInfoServer(config, leaderRetrievalService, actorSystem)
-            }
-            webserver.start()
-          }
-          val jobManagerWebPort = if (webserver == null) {
-            LOG.warn("Web server is null. It will not be accessible through YARN")
-            -1
-          } else webserver.getServerPort
-          // generate configuration file for TaskManagers
-          generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, ownHostname,
-            jobManagerPort, jobManagerWebPort, logDirs, slots, taskManagerCount,
-            dynamicPropertiesEncodedString)
-          // send "start yarn session" message to YarnJobManager.
-"Starting YARN session on Job Manager.")
-          jobManager ! StartYarnSession(conf, jobManagerPort, jobManagerWebPort)
-"Application Master properly initiated. Awaiting termination of actor system.")
-          actorSystem.awaitTermination()
-        }
-        catch {
-          case t: Throwable =>
-            LOG.error("Error while running the application master.", t)
-            if (actorSystem != null) {
-              actorSystem.shutdown()
-              actorSystem.awaitTermination()
-            }
-        }
-        finally {
-          if (webserver != null) {
-            LOG.debug("Stopping Job Manager web frontend.")
-            webserver.stop()
-          }
-        }
-        null
-      }
-    })
-  }
-  def generateConfigurationFile(
-      fileName: String,
-      currDir: String,
-      ownHostname: String,
-      jobManagerPort: Int,
-      jobManagerWebPort: Int,
-      logDirs: String,
-      slots: Int,
-      taskManagerCount: Int,
-      dynamicPropertiesEncodedString: String)
-    : Unit = {
-"Generate configuration file for application master.")
-    val output = new PrintWriter(new BufferedWriter(
-      new FileWriter(fileName))
-    )
-    for (line <- Source.fromFile(s"$currDir/$CONF_FILE").getLines() if !(line.contains
-      (ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY))) {
-      output.println(line)
-    }
-    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname")
-    output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort")
-    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs")
-    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort")
-    if(slots != -1){
-      output.println(s"${ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS}: $slots")
-      output.println(
-        s"${ConfigConstants.DEFAULT_PARALLELISM_KEY}: ${slots*taskManagerCount}")
-    }
-    output.println(s"${ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION}: " +
-    // add dynamic properties
-    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
-    import scala.collection.JavaConverters._
-    for(property <- dynamicProperties.asScala){
-      output.println(s"${property.f0}: ${property.f1}")
-    }
-    output.close()
-  }
-  /**
-   * Starts the JobManager and all its components.
-   *
-   * @return (Configuration, JobManager ActorSystem, JobManager ActorRef, Archiver ActorRef)
-   */
-  def startJobManager(
-      configuration: Configuration,
-      hostname: String,
-      streamingMode: StreamingMode)
-    : (ActorSystem, ActorRef, ActorRef) = {
-"Starting JobManager for YARN")
-    // set port to 0 to let Akka automatically determine the port.
-    LOG.debug("Starting JobManager actor system")
-    val jobManagerSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, 0)))
-    // start all the components inside the job manager
-    LOG.debug("Starting JobManager components")
-    val (executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      archiveProps,
-      executionRetries,
-      delayBetweenRetries,
-      timeout,
-      _,
-      leaderElectionService) = JobManager.createJobManagerComponents(configuration)
-    // start the archiver
-    val archiver: ActorRef = jobManagerSystem.actorOf(archiveProps, JobManager.ARCHIVE_NAME)
-    val jobManagerProps = Props(
-      new JobManager(
-        configuration,
-        executionContext,
-        instanceManager,
-        scheduler,
-        libraryCacheManager,
-        archiver,
-        executionRetries,
-        delayBetweenRetries,
-        timeout,
-        streamingMode,
-        leaderElectionService)
-      with ApplicationMasterActor)
-    LOG.debug("Starting JobManager actor")
-    val jobManager = JobManager.startActor(jobManagerProps, jobManagerSystem)
-    (jobManagerSystem, jobManager, archiver)
-  }
-  def createConfiguration(curDir: String, dynamicPropertiesEncodedString: String): Configuration = {
-"Loading config from: $curDir.")
-    GlobalConfiguration.loadConfiguration(curDir)
-    val configuration = GlobalConfiguration.getConfiguration()
-    configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, curDir)
-    // add dynamic properties to JobManager configuration.
-    val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
-    import scala.collection.JavaConverters._
-    for(property <- dynamicProperties.asScala){
-      configuration.setString(property.f0, property.f1)
-    }
-    configuration
-  }
+    val applicationMaster = new ApplicationMaster
-  def hasStreamingMode(env: java.util.Map[String, String]): Boolean = {
-    val sModeString = env.get(FlinkYarnClient.ENV_STREAMING_MODE)
-    if(sModeString != null) {
-      return sModeString.toBoolean
-    }
-    false