You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/07 17:34:41 UTC

git commit: SAMZA-58; Use YARN's AMRMClientAsync client library

Repository: incubator-samza
Updated Branches:
  refs/heads/master bd2fb6776 -> 4e5573b17


SAMZA-58; Use YARN's AMRMClientAsync client library


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4e5573b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4e5573b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4e5573b1

Branch: refs/heads/master
Commit: 4e5573b172a76e8d1413efb4152f2976ea54304f
Parents: bd2fb67
Author: Zhijie Shen <zs...@hortonworks.com>
Authored: Mon Apr 7 08:34:32 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Apr 7 08:34:32 2014 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../org/apache/samza/config/YarnConfig.scala    |   6 +
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  95 +++++++++---
 .../job/yarn/SamzaAppMasterLifecycle.scala      |  16 +-
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  |   5 -
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  59 ++++----
 .../apache/samza/job/yarn/YarnAppMaster.scala   |  85 -----------
 .../samza/job/yarn/YarnAppMasterListener.scala  |   5 -
 .../samza/job/yarn/TestSamzaAppMaster.scala     | 150 +++++++++++++++++++
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |  50 +++----
 .../yarn/TestSamzaAppMasterTaskManager.scala    | 106 ++++++-------
 .../samza/job/yarn/TestYarnAppMaster.scala      | 142 ------------------
 12 files changed, 347 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 75b6c98..b54cee6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -151,6 +151,7 @@ project(":samza-yarn_$scalaVersion") {
       exclude module: 'slf4j-api'
     }
     testCompile "junit:junit:$junitVersion"
+    testCompile "org.mockito:mockito-all:$mockitoVersion"
   }
 
   repositories {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 6c3aa92..5756d3a 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -30,6 +30,7 @@ object YarnConfig {
   val AM_JVM_OPTIONS = "yarn.am.opts"
   val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
   val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
+  val AM_POLL_INTERVAL_MS = "yarn.am.poll.interval.ms"
 
   implicit def Config2Yarn(config: Config) = new YarnConfig(config)
 }
@@ -69,5 +70,10 @@ class YarnConfig(config: Config) extends ScalaMapConfig(config) {
     case _ => None
   }
 
+  def getAMPollIntervalMs: Option[Int] = getOption(YarnConfig.AM_POLL_INTERVAL_MS) match {
+    case Some(interval) => Some(interval.toInt)
+    case _ => None
+  }
+
   def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index de6887d..c28c9a6 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -18,21 +18,27 @@
  */
 
 package org.apache.samza.job.yarn
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.YarnConfig
-import org.apache.samza.config.serializers.JsonConfigSerializer
+
+import scala.collection.JavaConversions.asScalaBuffer
+
 import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus, NodeReport }
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
-import scala.collection.JavaConversions._
-import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap }
-import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
-import org.apache.samza.config.YarnConfig._
-import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
-import org.apache.samza.util.hadoop.HttpFileSystem
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.samza.config.MapConfig
 import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CONTAINER_MEM
+import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CPU_CORES
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.util.hadoop.HttpFileSystem
+
+import grizzled.slf4j.Logging
 
 /**
  * When YARN executes an application master, it needs a bash command to
@@ -45,7 +51,12 @@ import org.apache.samza.config.ShellCommandConfig
  * YARN client, and YARN itself), and wires up everything to run Samza's
  * application master.
  */
-object SamzaAppMaster extends Logging {
+object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
+  val DEFAULT_POLL_INTERVAL_MS: Int = 1000
+  var state: SamzaAppMasterState = null
+  var listeners: List[YarnAppMasterListener] = null
+  var storedException: Throwable = null
+
   def main(args: Array[String]) {
     val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString)
     info("got container id: %s" format containerIdStr)
@@ -62,7 +73,8 @@ object SamzaAppMaster extends Logging {
     info("got config: %s" format config)
     val hConfig = new YarnConfiguration
     hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
-    val amClient = new AMRMClientImpl[ContainerRequest]
+    val interval = config.getAMPollIntervalMs.getOrElse(DEFAULT_POLL_INTERVAL_MS)
+    val amClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this)
     val clientHelper = new ClientHelper(hConfig)
     val registry = new MetricsRegistryMap
     val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)
@@ -71,14 +83,13 @@ object SamzaAppMaster extends Logging {
 
     try {
       // wire up all of the yarn event listeners
-      val state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+      state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
       val service = new SamzaAppMasterService(config, state, registry, clientHelper)
-      val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient, hConfig)
+      val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient)
       val metrics = new SamzaAppMasterMetrics(config, state, registry)
       val am = new SamzaAppMasterTaskManager({ System.currentTimeMillis }, config, state, amClient, hConfig)
-
-      // run the app master
-      new YarnAppMaster(List(state, service, lifecycle, metrics, am), amClient).run
+      listeners = List(state, service, lifecycle, metrics, am)
+      run(amClient, listeners, hConfig, interval)
     } finally {
       // jmxServer has to be stopped or will prevent process from exiting.
       if (jmxServer.isDefined) {
@@ -86,4 +97,52 @@ object SamzaAppMaster extends Logging {
       }
     }
   }
+
+  def run(amClient: AMRMClientAsync[ContainerRequest], listeners: List[YarnAppMasterListener], hConfig: YarnConfiguration, interval: Int): Unit = {
+    try {
+      amClient.init(hConfig)
+      amClient.start
+      listeners.foreach(_.onInit)
+      var isShutdown: Boolean = false
+      // have the loop to prevent the process from exiting until the job is to shutdown or error occurs on amClient
+      while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _) && storedException == null) {
+        try {
+          Thread.sleep(interval)
+        } catch {
+          case e: InterruptedException => {
+            isShutdown = true
+            info("got interrupt in app master thread, so shutting down")
+          }
+        }
+      }
+    } finally {
+      // listeners has to be stopped
+      listeners.foreach(listener => try {
+        listener.onShutdown
+      } catch {
+        case e: Exception => warn("Listener %s failed to shutdown." format listener, e)
+      })
+      // amClient has to be stopped
+      amClient.stop
+    }
+  }
+
+  override def onContainersCompleted(statuses: java.util.List[ContainerStatus]): Unit =
+    statuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus)))
+
+  override def onContainersAllocated(containers: java.util.List[Container]): Unit =
+    containers.foreach(container => listeners.foreach(_.onContainerAllocated(container)))
+
+  override def onShutdownRequest: Unit = listeners.foreach(_.onReboot)
+
+  override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = Unit
+
+  // TODO need to think about meaningful SAMZA's progress
+  override def getProgress: Float = 0.0F
+
+  override def onError(e: Throwable): Unit = {
+    error("Error occured in amClient's callback", e)
+    storedException = e
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
index 5d09265..8cb9490 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
@@ -18,27 +18,26 @@
  */
 
 package org.apache.samza.job.yarn
-import grizzled.slf4j.Logging
-import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
+
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.samza.SamzaException
+
+import grizzled.slf4j.Logging
 
 /**
  * Responsible for managing the lifecycle of the application master. Mostly,
  * this means registering and unregistering with the RM, and shutting down
  * when the RM tells us to Reboot.
  */
-class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient[_], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest]) extends YarnAppMasterListener with Logging {
   var validResourceRequest = true
   var shutdownMessage: String = null
 
   override def onInit() {
     val host = state.nodeHost
 
-    amClient.init(conf);
-    amClient.start
-
     val response = amClient.registerApplicationMaster(host, state.rpcPort, "%s:%d" format (host, state.trackingPort))
 
     // validate that the YARN cluster can handle our container resource requirements
@@ -63,7 +62,6 @@ class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Samza
   override def onShutdown() {
     info("Shutting down.")
     amClient.unregisterApplicationMaster(state.status, shutdownMessage, null)
-    amClient.stop
   }
 
   override def shouldShutdown = !validResourceRequest

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index 983771d..851aae6 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -51,7 +51,6 @@ class SamzaAppMasterMetrics(
   val registry: ReadableMetricsRegistry) extends MetricsHelper with YarnAppMasterListener with Logging {
 
   val jvm = new JvmMetrics(registry)
-  val mEventLoops = newCounter("event-loops")
   val reporters = config.getMetricReporterNames.map(reporterName => {
     val metricsFactoryClassName = config
       .getMetricsFactoryClass(reporterName)
@@ -82,10 +81,6 @@ class SamzaAppMasterMetrics(
     reporters.values.foreach(_.start)
   }
 
-  override def onEventLoop() {
-    mEventLoops.inc
-  }
-
   override def onShutdown() {
     reporters.values.foreach(_.stop)
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 9058210..418a981 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -18,38 +18,34 @@
  */
 
 package org.apache.samza.job.yarn
-import org.apache.hadoop.yarn.api.records.ContainerStatus
-import org.apache.hadoop.yarn.api.records.Container
-import org.apache.samza.config.Config
-import grizzled.slf4j.Logging
-import org.apache.samza.config.YarnConfig.Config2Yarn
-import org.apache.samza.config.YarnConfig
-import org.apache.samza.job.CommandBuilder
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.fs.Path
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.samza.util.Util
+
+import java.nio.ByteBuffer
+import java.util.Collections
+
 import scala.collection.JavaConversions._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.api.records.Priority
-import org.apache.hadoop.yarn.api.records.Resource
-import org.apache.hadoop.yarn.util.Records
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import java.util.Collections
-import org.apache.samza.job.ShellCommandBuilder
-import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
-import java.nio.ByteBuffer
-import org.apache.hadoop.yarn.client.api.impl.NMClientImpl
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+import org.apache.samza.config.Config
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.job.CommandBuilder
+import org.apache.samza.job.ShellCommandBuilder
+import org.apache.samza.util.Util
+
+import grizzled.slf4j.Logging
 
 object SamzaAppMasterTaskManager {
   val DEFAULT_CONTAINER_MEM = 1024
@@ -66,7 +62,7 @@ case class TaskFailure(val count: Int, val lastFailure: Long)
  * containers, handling failures, and notifying the application master that the
  * job is done.
  */
-class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
   import SamzaAppMasterTaskManager._
 
   state.taskCount = config.getTaskCount match {
@@ -79,14 +75,15 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
   val allSystemStreamPartitions = Util.getInputStreamPartitions(config)
   var taskFailures = Map[Int, TaskFailure]()
   var tooManyFailedContainers = false
-  var containerManager: NMClientImpl = null
+  // TODO we might want to use NMClientAsync as well
+  var containerManager: NMClient = null
 
   override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
 
   override def onInit() {
     state.neededContainers = state.taskCount
     state.unclaimedTasks = (0 until state.taskCount).toSet
-    containerManager = new NMClientImpl()
+    containerManager = NMClient.createNMClient()
     containerManager.init(conf)
     containerManager.start
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
deleted file mode 100644
index e45c177..0000000
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import scala.collection.JavaConversions._
-import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.api.records.AMCommand._
-import org.apache.samza.SamzaException
-
-/**
- * YARN's API is somewhat clunky. Most implementations just sit in a loop, and
- * poll the resource manager every N seconds (see the distributed shell
- * example). To make life slightly better, Samza separates the polling logic
- * from the application master logic, and we convert synchronous polling calls
- * to callbacks, which are more intuitive when dealing with event based
- * paradigms like YARN.
- *
- * <br/><br/>
- *
- * SamzaAppMaster uses this class to wire up all of Samza's application master
- * listeners.
- */
-class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) extends Logging {
-  var isShutdown = false
-
-  def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) = this(1000, listeners, amClient)
-
-  def run {
-    try {
-      listeners.foreach(_.onInit)
-
-      while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) {
-        val response = amClient.allocate(0)
-
-        if (response.getAMCommand != null) {
-          response.getAMCommand match {
-            case AM_RESYNC | AM_SHUTDOWN =>
-              listeners.foreach(_.onReboot)
-            case _ =>
-              val msg = "Unhandled value of AMCommand: " + response.getAMCommand
-              error(msg);
-              throw new SamzaException(msg);
-          }
-        }
-
-        listeners.foreach(_.onEventLoop)
-        response.getCompletedContainersStatuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus)))
-        response.getAllocatedContainers.foreach(container => listeners.foreach(_.onContainerAllocated(container)))
-
-        try {
-          Thread.sleep(pollIntervalMs)
-        } catch {
-          case e: InterruptedException => {
-            isShutdown = true
-            info("got interrupt in app master thread, so shutting down")
-          }
-        }
-      }
-    } finally {
-      listeners.foreach(listener => try {
-        listener.onShutdown
-      } catch {
-        case e: Exception => warn("Listener %s failed to shutdown." format listener, e)
-      })
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
index 1353e86..6bf3046 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
@@ -65,9 +65,4 @@ trait YarnAppMasterListener {
    */
   def onContainerCompleted(containerStatus: ContainerStatus) {}
 
-  /**
-   * Invoked by YarnAppMaster once per listener, every time it loops around to
-   * poll the RM again.
-   */
-  def onEventLoop() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
new file mode 100644
index 0000000..190ce28
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
+
+import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus }
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.junit.Test
+
+import TestSamzaAppMasterTaskManager._
+
+class TestSamzaAppMaster {
+  @Test
+  def testAppMasterShouldShutdown {
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
+    val listener = new YarnAppMasterListener {
+      var init = 0
+      var shutdown = 0
+      var allocated = 0
+      var complete = 0
+      override def shouldShutdown = true
+      override def onInit() {
+        init += 1
+      }
+      override def onShutdown() {
+        shutdown += 1
+      }
+      override def onContainerAllocated(container: Container) {
+        allocated += 1
+      }
+      override def onContainerCompleted(containerStatus: ContainerStatus) {
+        complete += 1
+      }
+    }
+    SamzaAppMaster.listeners = List(listener)
+    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+    assert(listener.init == 1)
+    assert(listener.shutdown == 1)
+  }
+
+  @Test
+  def testAppMasterShouldShutdownWithFailingListener {
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
+    val listener1 = new YarnAppMasterListener {
+      var shutdown = 0
+      override def shouldShutdown = true
+      override def onShutdown() {
+        shutdown += 1
+        throw new RuntimeException("Some weird failure")
+      }
+    }
+    val listener2 = new YarnAppMasterListener {
+      var shutdown = 0
+      override def shouldShutdown = true
+      override def onShutdown() {
+        shutdown += 1
+      }
+    }
+    // listener1 will throw an exception in shutdown, and listener2 should still get called
+    SamzaAppMaster.listeners = List(listener1, listener2)
+    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+    assert(listener1.shutdown == 1)
+    assert(listener2.shutdown == 1)
+  }
+
+  @Test
+  def testAppMasterShouldShutdownWithInterrupt {
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
+    val listener = new YarnAppMasterListener {
+      var init = 0
+      var shutdown = 0
+      override def shouldShutdown = false
+      override def onInit() {
+        init += 1
+      }
+      override def onShutdown() {
+        shutdown += 1
+      }
+    }
+    val thread = new Thread {
+      override def run {
+        SamzaAppMaster.listeners = List(listener)
+        SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+      }
+    }
+    thread.start
+    thread.interrupt
+    thread.join
+    assert(listener.init == 1)
+    assert(listener.shutdown == 1)
+  }
+
+  @Test
+  def testAppMasterShouldForwardAllocatedAndCompleteContainers {
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(getContainer(null)), List(getContainerStatus(null, 1, null)))))
+    val listener = new YarnAppMasterListener {
+      var allocated = 0
+      var complete = 0
+      override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "")
+      override def shouldShutdown = (allocated >= 1 && complete >= 1)
+      override def onContainerAllocated(container: Container) {
+        allocated += 1
+      }
+      override def onContainerCompleted(containerStatus: ContainerStatus) {
+        complete += 1
+      }
+    }
+    SamzaAppMaster.listeners = List(listener)
+    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+    // heartbeat may be triggered for more than once
+    assert(listener.allocated >= 1)
+    assert(listener.complete >= 1)
+  }
+
+  @Test
+  def testAppMasterShouldReboot {
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(true, List(), List())))
+    val listener = new YarnAppMasterListener {
+      var reboot = 0
+      override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "")
+      override def shouldShutdown = reboot >= 1
+      override def onReboot() {
+        reboot += 1
+      }
+    }
+    SamzaAppMaster.listeners = List(listener)
+    SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+    // heartbeat may be triggered for more than once
+    assert(listener.reboot >= 1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 4ee77e8..cce63eb 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -18,28 +18,26 @@
  */
 
 package org.apache.samza.job.yarn
-import org.junit.Assert._
-import org.junit.Test
+
+import java.nio.ByteBuffer
+
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
+
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.api.records.ContainerId
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.hadoop.yarn.api.records.Priority
-import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
-import org.apache.hadoop.yarn.api.records.Resource
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.service._
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
+import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import java.nio.ByteBuffer
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito
 
 class TestSamzaAppMasterLifecycle {
-  val amClient = new AMRMClientImpl() {
+  val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
     var host = ""
     var port = 0
     var status: FinalApplicationStatus = null
@@ -61,7 +59,6 @@ class TestSamzaAppMasterLifecycle {
         override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
       }
     }
-    override def allocate(progressIndicator: Float): AllocateResponse = null
     override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
       appMessage: String,
       appTrackingUrl: String) {
@@ -70,19 +67,16 @@ class TestSamzaAppMasterLifecycle {
     override def releaseAssignedContainer(containerId: ContainerId) {}
     override def getClusterNodeCount() = 1
 
-    override def init(config: Configuration) {}
-    override def start() {}
-    override def stop() {}
-    override def getName(): String = ""
-    override def getConfig() = null
-    override def getStartTime() = 0L
+    override def serviceInit(config: Configuration) {}
+    override def serviceStart() {}
+    override def serviceStop() {}
   }
 
   @Test
   def testLifecycleShouldRegisterOnInit {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
     state.rpcPort = 1
-    val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient, new YarnConfiguration)
+    val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient)
     saml.onInit
     assert(amClient.host == "test")
     assert(amClient.port == 1)
@@ -93,7 +87,7 @@ class TestSamzaAppMasterLifecycle {
   def testLifecycleShouldUnregisterOnShutdown {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.status = FinalApplicationStatus.SUCCEEDED
-    new SamzaAppMasterLifecycle(128, 1, state, amClient, new YarnConfiguration).onShutdown
+    new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown
     assert(amClient.status == FinalApplicationStatus.SUCCEEDED)
   }
 
@@ -101,7 +95,7 @@ class TestSamzaAppMasterLifecycle {
   def testLifecycleShouldThrowAnExceptionOnReboot {
     var gotException = false
     try {
-      new SamzaAppMasterLifecycle(368, 1, null, amClient, new YarnConfiguration).onReboot
+      new SamzaAppMasterLifecycle(368, 1, null, amClient).onReboot
     } catch {
       // expected
       case e: SamzaException => gotException = true
@@ -113,8 +107,8 @@ class TestSamzaAppMasterLifecycle {
   def testLifecycleShouldShutdownOnInvalidContainerSettings {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
     state.rpcPort = 1
-    List(new SamzaAppMasterLifecycle(768, 1, state, amClient, new YarnConfiguration),
-      new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration)).map(saml => {
+    List(new SamzaAppMasterLifecycle(768, 1, state, amClient),
+      new SamzaAppMasterLifecycle(368, 3, state, amClient)).map(saml => {
         saml.onInit
         assertTrue(saml.shouldShutdown)
       })

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 9d832ae..7fd80d5 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -18,27 +18,33 @@
  */
 
 package org.apache.samza.job.yarn
-import org.junit.Assert._
-import org.junit.Test
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.Partition
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.util.ConverterUtils
+
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
 import scala.collection.JavaConversions._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
+
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.records.NodeReport
-import TestSamzaAppMasterTaskManager._
-import org.apache.samza.system.{SystemStreamPartition, SystemFactory}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.samza.Partition
+import org.apache.samza.config.Config
+import org.apache.samza.config.MapConfig
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.Util
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
+import org.apache.samza.util.Util
+import org.junit.Assert._
+import org.junit.Test
+
+import TestSamzaAppMasterTaskManager._
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
@@ -80,7 +86,11 @@ object TestSamzaAppMasterTaskManager {
     override def setDiagnostics(diagnostics: String) = {}
   }
 
-  def getAmClient = (response: AllocateResponse) => new AMRMClientImpl[ContainerRequest] {
+  def getAmClient = (amClient: TestAMRMClientImpl) => new AMRMClientAsyncImpl(amClient, 1, SamzaAppMaster) {
+    def getClient: TestAMRMClientImpl = amClient
+  }
+
+  class TestAMRMClientImpl(response: AllocateResponse) extends AMRMClientImpl[ContainerRequest] {
     var requests: List[ContainerRequest] = List[ContainerRequest]()
 
     def getRelease = release
@@ -91,13 +101,9 @@ object TestSamzaAppMasterTaskManager {
     override def addContainerRequest(req: ContainerRequest) { requests ::= req }
     override def removeContainerRequest(req: ContainerRequest) {}
     override def getClusterNodeCount() = 1
-
-    override def init(config: Configuration) {}
-    override def start() {}
-    override def stop() {}
-    override def getName(): String = ""
-    override def getConfig() = null
-    override def getStartTime() = 0L
+    override def serviceInit(config: Configuration) {}
+    override def serviceStart() {}
+    override def serviceStop() {}
   }
 
   def getAppMasterResponse(reboot: Boolean, containers: List[Container], completed: List[ContainerStatus]) =
@@ -111,10 +117,10 @@ object TestSamzaAppMasterTaskManager {
       override def getCompletedContainersStatuses() = completed
       override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {}
       override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
-      override def getUpdatedNodes = null
+      override def getUpdatedNodes = List[NodeReport]()
       override def getNumClusterNodes = 1
       override def setNumClusterNodes(num: Int) {}
-      override def getNMTokens = null
+      override def getNMTokens = List[NMToken]()
       override def setNMTokens(nmTokens: java.util.List[NMToken]) {}
       override def setAMCommand(command: AMCommand) {}
       override def getPreemptionMessage = null
@@ -164,7 +170,7 @@ class TestSamzaAppMasterTaskManager {
 
   @Test
   def testAppMasterShouldRequestANewContainerWhenATaskFails {
-    val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
       override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
@@ -179,20 +185,20 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
     assert(taskManager.shouldShutdown == false)
     // 2. First is from onInit, second is from onContainerCompleted, since it failed.
-    assertEquals(2, amClient.requests.size)
-    assertEquals(0, amClient.getRelease.size)
+    assertEquals(2, amClient.getClient.requests.size)
+    assertEquals(0, amClient.getClient.getRelease.size)
     assertFalse(taskManager.shouldShutdown)
     // Now trigger an AM shutdown since our retry count is 1, and we're failing twice
     taskManager.onContainerAllocated(getContainer(container2))
     taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
-    assertEquals(2, amClient.requests.size)
-    assertEquals(0, amClient.getRelease.size)
+    assertEquals(2, amClient.getClient.requests.size)
+    assertEquals(0, amClient.getClient.getRelease.size)
     assertTrue(taskManager.shouldShutdown)
   }
 
   @Test
   def testAppMasterShouldRequestANewContainerWhenATaskIsReleased {
-    val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.taskCount = 2
     var containersRequested = 0
@@ -213,8 +219,8 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == false)
     taskManager.onInit
     assert(taskManager.shouldShutdown == false)
-    assert(amClient.requests.size == 1)
-    assert(amClient.getRelease.size == 0)
+    assert(amClient.getClient.requests.size == 1)
+    assert(amClient.getClient.getRelease.size == 0)
 
     // allocate container 2
     taskManager.onContainerAllocated(getContainer(container2))
@@ -231,13 +237,13 @@ class TestSamzaAppMasterTaskManager {
     assert(state.runningTasks.size == 1)
     assert(state.taskPartitions.size == 1)
     assert(state.unclaimedTasks.size == 0)
-    assert(amClient.requests.size == 1)
-    assert(amClient.getRelease.size == 1)
-    assert(amClient.getRelease.head.equals(container3))
+    assert(amClient.getClient.requests.size == 1)
+    assert(amClient.getClient.getRelease.size == 1)
+    assert(amClient.getClient.getRelease.head.equals(container3))
 
     // reset the helper state, so we can make sure that releasing the container (next step) doesn't request more resources
-    amClient.requests = List()
-    amClient.resetRelease
+    amClient.getClient.requests = List()
+    amClient.getClient.resetRelease
 
     // now release the container, and make sure the AM doesn't ask for more
     assert(taskManager.shouldShutdown == false)
@@ -247,15 +253,15 @@ class TestSamzaAppMasterTaskManager {
     assert(state.runningTasks.size == 1)
     assert(state.taskPartitions.size == 1)
     assert(state.unclaimedTasks.size == 0)
-    assert(amClient.requests.size == 0)
-    assert(amClient.getRelease.size == 0)
+    assert(amClient.getClient.requests.size == 0)
+    assert(amClient.getClient.getRelease.size == 0)
 
     // pretend container 2 is released due to an NM failure, and make sure that the AM requests a new container
     assert(taskManager.shouldShutdown == false)
     taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure"))
     assert(taskManager.shouldShutdown == false)
-    assert(amClient.requests.size == 1)
-    assert(amClient.getRelease.size == 0)
+    assert(amClient.getClient.requests.size == 1)
+    assert(amClient.getClient.getRelease.size == 0)
   }
 
   @Test
@@ -263,7 +269,7 @@ class TestSamzaAppMasterTaskManager {
     val map = new java.util.HashMap[String, String](config)
     map.put("yarn.container.count", "2")
     val newConfig = new MapConfig(map)
-    val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.taskCount = 2
     var containersStarted = 0
@@ -278,8 +284,8 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == false)
     taskManager.onInit
     assert(taskManager.shouldShutdown == false)
-    assert(amClient.requests.size == 2)
-    assert(amClient.getRelease.size == 0)
+    assert(amClient.getClient.requests.size == 2)
+    assert(amClient.getClient.getRelease.size == 0)
     taskManager.onContainerAllocated(getContainer(container2))
     assert(state.neededContainers == 1)
     assert(state.runningTasks.size == 1)
@@ -330,7 +336,7 @@ class TestSamzaAppMasterTaskManager {
 
   @Test
   def testAppMasterShouldReleaseExtraContainers {
-    val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+    val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     var containersRequested = 0
     var containersStarted = 0
@@ -350,8 +356,8 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == false)
     taskManager.onInit
     assert(taskManager.shouldShutdown == false)
-    assert(amClient.requests.size == 1)
-    assert(amClient.getRelease.size == 0)
+    assert(amClient.getClient.requests.size == 1)
+    assert(amClient.getClient.getRelease.size == 0)
     assert(state.neededContainers == 1)
     assert(state.runningTasks.size == 0)
     assert(state.taskPartitions.size == 0)
@@ -370,9 +376,9 @@ class TestSamzaAppMasterTaskManager {
     assert(state.unclaimedTasks.size == 0)
     assert(containersRequested == 1)
     assert(containersStarted == 1)
-    assert(amClient.requests.size == 1)
-    assert(amClient.getRelease.size == 1)
-    assert(amClient.getRelease.head.equals(container3))
+    assert(amClient.getClient.requests.size == 1)
+    assert(amClient.getClient.getRelease.size == 1)
+    assert(amClient.getClient.getRelease.head.equals(container3))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
deleted file mode 100644
index 98f7844..0000000
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-import org.junit.Assert._
-import org.junit.Test
-import TestSamzaAppMasterTaskManager._
-import org.apache.hadoop.yarn.api.records.Container
-import org.apache.hadoop.yarn.api.records.ContainerStatus
-import org.apache.hadoop.yarn.api.records.ResourceRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.ContainerId
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-
-class TestYarnAppMaster {
-  @Test
-  def testAppMasterShouldShutdown {
-    val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
-    val listener = new YarnAppMasterListener {
-      var init = 0
-      var shutdown = 0
-      var allocated = 0
-      var complete = 0
-      override def shouldShutdown = true
-      override def onInit() {
-        init += 1
-      }
-      override def onShutdown() {
-        shutdown += 1
-      }
-      override def onContainerAllocated(container: Container) {
-        allocated += 1
-      }
-      override def onContainerCompleted(containerStatus: ContainerStatus) {
-        complete += 1
-      }
-    }
-    new YarnAppMaster(List(listener), amClient).run
-    assert(listener.init == 1)
-    assert(listener.shutdown == 1)
-  }
-
-  @Test
-  def testAppMasterShouldShutdownWithFailingListener {
-    val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
-    val listener1 = new YarnAppMasterListener {
-      var shutdown = 0
-      override def shouldShutdown = true
-      override def onShutdown() {
-        shutdown += 1
-        throw new RuntimeException("Some weird failure")
-      }
-    }
-    val listener2 = new YarnAppMasterListener {
-      var shutdown = 0
-      override def shouldShutdown = true
-      override def onShutdown() {
-        shutdown += 1
-      }
-    }
-    // listener1 will throw an exception in shutdown, and listener2 should still get called 
-    new YarnAppMaster(List(listener1, listener2), amClient).run
-    assert(listener1.shutdown == 1)
-    assert(listener2.shutdown == 1)
-  }
-
-  @Test
-  def testAppMasterShouldShutdownWithInterrupt {
-    val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
-    val listener = new YarnAppMasterListener {
-      var init = 0
-      var shutdown = 0
-      override def shouldShutdown = false
-      override def onInit() {
-        init += 1
-      }
-      override def onShutdown() {
-        shutdown += 1
-      }
-    }
-    val am = new YarnAppMaster(List(listener), amClient)
-    val thread = new Thread {
-      override def run {
-        am.run
-      }
-    }
-    thread.start
-    thread.interrupt
-    thread.join
-    assert(listener.init == 1)
-    assert(listener.shutdown == 1)
-  }
-
-  @Test
-  def testAppMasterShouldForwardAllocatedAndCompleteContainers {
-    val amClient = getAmClient(getAppMasterResponse(false, List(getContainer(null)), List(getContainerStatus(null, 1, null))))
-    val listener = new YarnAppMasterListener {
-      var allocated = 0
-      var complete = 0
-      override def shouldShutdown = (allocated == 1 && complete == 1)
-      override def onContainerAllocated(container: Container) {
-        allocated += 1
-      }
-      override def onContainerCompleted(containerStatus: ContainerStatus) {
-        complete += 1
-      }
-    }
-    new YarnAppMaster(List(listener), amClient).run
-    assert(listener.allocated == 1)
-    assert(listener.complete == 1)
-  }
-
-  @Test
-  def testAppMasterShouldReboot {
-    val amClient = getAmClient(getAppMasterResponse(true, List(), List()))
-    val listener = new YarnAppMasterListener {
-      var reboot = 0
-      override def shouldShutdown = reboot == 1
-      override def onReboot() {
-        reboot += 1
-      }
-    }
-    new YarnAppMaster(List(listener), amClient).run
-    assert(listener.reboot == 1)
-  }
-}